By using AWS re:Post, you agree to the Terms of Use
/Data Lakes/

Questions tagged with Data Lakes

Sort by most recent
  • 1
  • 90 / page

Browse through the questions and answers listed below or filter and sort to narrow down your results.

Connecting Users to AWS Athena and AWS Lake Formation via Tableau Desktop using the Simba Athena JDBC Driver and Okta as Identity Provider

Hello, due to the following Step by Step Guide provided by the official AWS Athena user-guide (Link at the End of the question), it should be possible to connect Tableau Desktop to Athena and Lake Formation via the Simba Athena JDBC Driver using Okta as Idp. The challenge that I am facing right now, is although i followed each step as documented in the Athena user-guide i can not make the connection work. The error message that i recieve whenever i try to connect Tableau Desktop states: > [Simba][AthenaJDBC](100071) An error has been thrown from the AWS Athena client. The security token included in the request is invalid. [Execution ID not available] Invalid Username or Password. My athena.properties file to configure the driver on the Tableau via connection string URL looks as follows (User Name and Password are masked): ``` jdbc:awsathena://AwsRegion=eu-central-1; S3OutputLocation=s3://athena-query-results; AwsCredentialsProviderClass=com.simba.athena.iamsupport.plugin.OktaCredentialsProvider; idp_host=1234.okta.com; User=*****.*****@example.com; Password=******************; app_id=****************************; ssl_insecure=true; okta_mfa_type=oktaverifywithpush; LakeFormationEnabled=true; ``` The configuration settings used in here are from the official Simba Athena JDBC driver documentation (Version: 2.0.31). Furthermore i assigned the required permissions for my users and groups inside Lake Formation as stated in the Step by Step guide linked below. Right now I am not able to point out why I am not able to make the connection work. So I would be very greatful for any support / idea to find a solution on that topic. Best regards Link: https://docs.aws.amazon.com/athena/latest/ug/security-athena-lake-formation-jdbc-okta-tutorial.html#security-athena-lake-formation-jdbc-okta-tutorial-step-1-create-an-okta-account)
0
answers
0
votes
10
views
asked a day ago

Hudi Clustering

I am using EMR 6.6.0, which has hudi 10.1. I am trying to bulkinsert and do inline clustering using Hudi. But seems its not clustering the file as per file size being mentioned. But it is still producing the files in KB only. I tried below configuration: > hudi_clusteringopt = { 'hoodie.table.name': 'myhudidataset_upsert_legacy_new7', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': 'my_hudi_db', 'hoodie.datasource.hive_sync.table': 'myhudidataset_upsert_legacy_new7', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.write.operation": "bulk_insert", } # "hoodie.datasource.write.operation": "bulk_insert", try: inputDF.write.format("org.apache.hudi"). \ options(**hudi_clusteringopt). \ option("hoodie.parquet.small.file.limit", "0"). \ option("hoodie.clustering.inline", "true"). \ option("hoodie.clustering.inline.max.commits", "0"). \ option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824"). \ option("hoodie.clustering.plan.strategy.small.file.limit", "629145600"). \ option("hoodie.clustering.plan.strategy.sort.columns", "pk_col"). \ mode('append'). \ save("s3://xxxxxxxxxxxxxx"); except Exception as e: print(e) Here is the data set if someone wants to regenerate: inputDF = spark.createDataFrame( [ ("1001",1001, "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("1011",1011, "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("1021",1021, "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("1031",1031, "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("1041",1041, "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("1051",1051, "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id","id_val", "creation_date", "last_update_time"] )
1
answers
0
votes
7
views
asked 9 days ago

Describe table in Athena fails with insufficient lake formation permissions

When I try to run the following query via the Athena JDBC Driver ```sql describe gitlab.issues ``` I get the following error: > [Simba][AthenaJDBC](100071) An error has been thrown from the AWS Athena client. FAILED: SemanticException Unable to fetch table gitlab. Insufficient Lake Formation permission(s) on gitlab (Service: AmazonDataCatalog; Status Code: 400; Error Code: AccessDeniedException; Request ID: be6aeb1b-fc06-410d-9723-2df066307b35; Proxy: null) [Execution ID: a2534d22-c4df-49e9-8515-80224779bf01] the following query works: ```sql select * from gitlab.issues limit 10 ``` The role that is used has the `DESCRIBE` permission on the `gitlab` database and `DESCRIBE, SELECT` permissions on the table `issues`. It also has the following IAM permissions: ```json { "Version": "2012-10-17", "Statement": [ { "Action": [ "athena:BatchGetNamedQuery", "athena:BatchGetQueryExecution", "athena:CreatePreparedStatement", "athena:DeletePreparedStatement", "athena:GetDataCatalog", "athena:GetDatabase", "athena:GetNamedQuery", "athena:GetPreparedStatement", "athena:GetQueryExecution", "athena:GetQueryResults", "athena:GetQueryResultsStream", "athena:GetTableMetadata", "athena:GetWorkGroup", "athena:ListDatabases", "athena:ListNamedQueries", "athena:ListPreparedStatements", "athena:ListDataCatalogs", "athena:ListEngineVersions", "athena:ListQueryExecutions", "athena:ListTableMetadata", "athena:ListTagsForResource", "athena:ListWorkGroups", "athena:StartQueryExecution", "athena:StopQueryExecution", "athena:UpdatePreparedStatement" ], "Resource": "*", "Effect": "Allow" }, { "Action": [ "glue:BatchGetCustomEntityTypes", "glue:BatchGetPartition", "glue:GetCatalogImportStatus", "glue:GetColumnStatisticsForPartition", "glue:GetColumnStatisticsForTable", "glue:GetCustomEntityType", "glue:GetDatabase", "glue:GetDatabases", "glue:GetPartition", "glue:GetPartitionIndexes", "glue:GetPartitions", "glue:GetSchema", "glue:GetSchemaByDefinition", "glue:GetSchemaVersion", "glue:GetSchemaVersionsDiff", "glue:GetTable", "glue:GetTableVersion", "glue:GetTableVersions", "glue:GetTables", "glue:GetUserDefinedFunction", "glue:GetUserDefinedFunctions", "glue:ListCustomEntityTypes", "glue:ListSchemaVersions", "glue:ListSchemas", "glue:QuerySchemaVersionMetadata", "glue:SearchTables" ], "Resource": "*", "Effect": "Allow" }, { "Condition": { "ForAnyValue:StringEquals": { "aws:CalledVia": "athena.amazonaws.com" } }, "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::aws-athena-query-results-123456789012-eu-west-1", "arn:aws:s3:::aws-athena-query-results-123456789012-eu-west-1/*", "arn:aws:s3:::aws-athena-federation-spill-123456789012-eu-west-1", "arn:aws:s3:::aws-athena-federation-spill-123456789012-eu-west-1/*" ], "Effect": "Allow" }, { "Action": [ "lakeformation:CancelTransaction", "lakeformation:CommitTransaction", "lakeformation:DescribeResource", "lakeformation:DescribeTransaction", "lakeformation:ExtendTransaction", "lakeformation:GetDataAccess", "lakeformation:GetQueryState", "lakeformation:GetQueryStatistics", "lakeformation:GetTableObjects", "lakeformation:GetWorkUnitResults", "lakeformation:GetWorkUnits", "lakeformation:StartQueryPlanning", "lakeformation:StartTransaction" ], "Resource": "*", "Effect": "Allow" }, { "Condition": { "ForAnyValue:StringEquals": { "aws:CalledVia": "athena.amazonaws.com" } }, "Action": "lambda:InvokeFunction", "Resource": "arn:aws:lambda:*:*:function:athena-federation-*", "Effect": "Allow" }, { "Condition": { "ForAnyValue:StringEquals": { "aws:CalledVia": "athena.amazonaws.com" } }, "Action": ["s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket"], "Resource": "*", "Effect": "Allow" } ] } ``` even if I make the role a LakeFormation Admin, Database Creator, assign Super Permissions to the table and database and add the AdministratorAccess IAM Policy to the role it still fails.
0
answers
0
votes
24
views
asked 12 days ago

_temp AWS lake formation blueprint pipeline tables appears to IAM user in athena editor although I didn't give this user permission on them

_temp lake formation blueprint pipeline tables appears to IAM user in Athena editor, although I didn't give this user permission on them below the policy granted to this IAM user,also in lake formation permsissions ,I didnt give this user any permissions on _temp tables: { "Version": "2012-10-17", "Statement": [ { "Sid": "Stmt1652364721496", "Action": [ "athena:BatchGetNamedQuery", "athena:BatchGetQueryExecution", "athena:GetDataCatalog", "athena:GetDatabase", "athena:GetNamedQuery", "athena:GetPreparedStatement", "athena:GetQueryExecution", "athena:GetQueryResults", "athena:GetQueryResultsStream", "athena:GetTableMetadata", "athena:GetWorkGroup", "athena:ListDataCatalogs", "athena:ListDatabases", "athena:ListEngineVersions", "athena:ListNamedQueries", "athena:ListPreparedStatements", "athena:ListQueryExecutions", "athena:ListTableMetadata", "athena:ListTagsForResource", "athena:ListWorkGroups", "athena:StartQueryExecution", "athena:StopQueryExecution" ], "Effect": "Allow", "Resource": "*" }, { "Effect": "Allow", "Action": [ "glue:GetDatabase", "glue:GetDatabases", "glue:BatchDeleteTable", "glue:GetTable", "glue:GetTables", "glue:GetPartition", "glue:GetPartitions", "glue:BatchGetPartition" ], "Resource": [ "*" ] }, { "Sid": "Stmt1652365282568", "Action": "s3:*", "Effect": "Allow", "Resource": [ "arn:aws:s3:::queryresults-all", "arn:aws:s3:::queryresults-all/*" ] }, { "Effect": "Allow", "Action": [ "lakeformation:GetDataAccess" ], "Resource": [ "*" ] } ] }
1
answers
0
votes
15
views
asked 2 months ago

AWS Lake Formation: (AccessDeniedException) when calling the GetTable operation: Insufficient Lake Formation permission(s) on table

I have implemented LakeFormation on my data bucket. I have a step function in which one step consists of running a GlueJob that reads and writes to the data catalog. I have upgraded my DataLake permissions as reported [here][1]. The Service Role that runs my Step Function has a root-type policy (granted just for debugging this issue): ```yaml Statement: - Effect: "Allow" Action: - "*" Resource: - "*" ``` On lake formation the service role has: - Administrator Rights - Database Creation rights (and Grantable) - Data Location access to the entire bucket (and Grantable) - Super rights on read and write Database (and Grantable) - Super rights on ALL tables within above Databases (and Grantable). The bucket is not encrypted. But, somehow, its access to the tables is denied with the error: ``` (AccessDeniedException) when calling the GetTable operation: Insufficient Lake Formation permission(s) on table ``` What's really strange is that the Glue Job succeeds when writing to some tables, and fails on others. And there is no real substantial difference across tables: all of them are under the same S3 prefix, parquet files, partitioned on the same key. Given the abundance of permissions granted, I am really clueless about what is causing the error. Please, send help. [1]: https://docs.aws.amazon.com/lake-formation/latest/dg/upgrade-glue-lake-formation.html
0
answers
0
votes
23
views
asked 3 months ago

Grouping of partitioned dataframes

I have a large dataset (table) with >1e9 records (rows) in Glue. The tables are partitioned by column A, which is a n-letters subtring of column B. For example: | A (partition key) | B | ... | | --- | --- | --- | | abc | abc123... | ... | | abc | abc123... | ... | | abc | abc456... | ... | | abc | abc456... | ... | | abc | abc456... | ... | | abc | abc789... | ... | | abc | abc789... | ... | | ... | ... | ... | | xyz | xyz123... | ... | | xyz | xyz123... | ... | | xyz | xyz123... | ... | | xyz | xyz456... | ... | | xyz | xyz456... | ... | | xyz | xyz456... | ... | | xyz | xyz789... | ... | | xyz | xyz789... | ... | There are >1e6 possible different values of column B and correspondingly significantly less for column A (maybe 1e3). Now I need to group records/rows by column B and the assumption is that it could be advantageous if the table was partitioned by column A, as it would be sufficient to load dataframes from single partitions for grouping instead of running the operation on the entire table. (Partitioning by column B would lead to unreasonably large numbers partitions.) Is my assumption right? How would I tell my Glue job the link between column A and B and profit from the partitioning? Alternatively I could handle the 1e3 dataframes (one for each partition) separately in my Glue job and merge them lateron. But this looks a bit complicated to me. This question is a follow-up question to https://repost.aws/questions/QUwxdl4EwTQcKBuL8MKCU0EQ/are-partitions-advantageous-for-groupby-operations-in-glue-jobs.
1
answers
0
votes
12
views
asked 4 months ago

Hive Sync fails: AWSGlueDataCatalogHiveClientFactory not found

I'm syncing data written to S3 using Apache Hudi with Hive & Glue. Hudi options: ``` hudi_options: 'hoodie.table.name': mytable 'hoodie.datasource.write.recordkey.field': Id 'hoodie.datasource.write.partitionpath.field': date 'hoodie.datasource.write.table.name': mytable 'hoodie.datasource.write.operation': upsert 'hoodie.datasource.write.precombine.field': LastModifiedDate 'hoodie.datasource.hive_sync.enable': true 'hoodie.datasource.hive_sync.partition_fields': date 'hoodie.datasource.hive_sync.database': hudi_lake_dev 'hoodie.datasource.hive_sync.table': mytable ``` EMR Configurations: ``` ... { "Classification": "yarn-site", "Properties": { "yarn.nodemanager.vmem-check-enabled": "false", "yarn.log-aggregation-enable": "true", "yarn.log-aggregation.retain-seconds": "-1", "yarn.nodemanager.remote-app-log-dir": config[ "yarn_agg_log_uri_s3_path" ].format(current_date), }, "Configurations": [], }, { "Classification": "spark-hive-site", "Properties": { "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" # noqa }, }, { "Classification": "presto-connector-hive", "Properties": {"hive.metastore.glue.datacatalog.enabled": "true"}, }, { "Classification": "hive-site", "Properties": { "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", # noqa "hive.metastore.schema.verification": "false", }, }, ... ``` Getting the following error: ``` Traceback (most recent call last): File "/home/hadoop/my_pipeline.py", line 31, in <module> ... File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1109, in save File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o84.save. : org.apache.hudi.hive.HoodieHiveSyncException: Got runtime exception when hive syncing at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:74) at org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:391) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$4(HoodieSparkSqlWriter.scala:440) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$4$adapted(HoodieSparkSqlWriter.scala:436) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:436) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:497) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:223) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Failed to create HiveMetaStoreClient at org.apache.hudi.hive.HoodieHiveClient.<init>(HoodieHiveClient.java:93) at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:69) ... 46 more Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to instantiate a metastore client factory com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory due to: java.lang.ClassNotFoundException: Class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory not found) at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:239) at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:402) at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:335) at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:315) at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:291) at org.apache.hudi.hive.HoodieHiveClient.<init>(HoodieHiveClient.java:91) ... 47 more Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to instantiate a metastore client factory com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory due to: java.lang.ClassNotFoundException: Class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory not found) at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3991) at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:251) at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:234) ... 52 more Caused by: MetaException(message:Unable to instantiate a metastore client factory com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory due to: java.lang.ClassNotFoundException: Class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory not found) at org.apache.hadoop.hive.ql.metadata.HiveUtils.createMetaStoreClientFactory(HiveUtils.java:525) at org.apache.hadoop.hive.ql.metadata.HiveUtils.createMetaStoreClient(HiveUtils.java:506) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3746) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3726) at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3988) ... 54 more 22/01/12 16:37:53 INFO SparkContext: Invoking stop() from shutdown hook ```
1
answers
0
votes
348
views
asked 6 months ago

Best way to setup bucket with access points?

Hello, As part of a SaaS solution, I'm currently setting up the structure for a S3 bucket which will contian multiple clients' data. The idea is to use one access point per client, in order to isolate the different client's data. To be clear, the data is not made accessible to the client (not directly at least). The bucket is only used to absorb data to be used for processing and analysis purposes. This data is saved into different folders depending on the source type, so for example in a given access point one could have /images/ /logs/ etc. However, I'm unsure whether I should add extra partitioning to that, for a few reasons. For example, one is file collision. Suppose access point A has a file /images/tree.png, and then access point B tries to add a file with the same path, how is the collision handled? That could be solved with a something like hash suffix, but I'd still like to know what would happen. Then there is the question of scalability. This is not an issue per se, but I'm trying to think about what could happen in the future. It seems to me that having an extra partition on top of the access point would make it easier in the future, if there's any migration / refactoring that are needed. My solution would be to add the organisation id as prefix. Each access point would only have access (through the policy) to files in a specific subdirectory, like /12345/* However, this means that the callers to the access point need to add that prefix too, which is adds an extra step for all inputs pushing data to the access point, instead of using access point like it were a bucket directly. I'm not sure which way to go, if I'm complicating things or if there is a simpler solution, hence my question. Any advice would be greatly appreciated!
1
answers
1
votes
24
views
asked 6 months ago

XML interpret one struct as an array

I've been trying this for a week but I'm starting to give up - I need some help understanding this. I have an S3 bucket full of XML files, and I am creating a pyspark ETL job to convert them to Parquet so I can query them in Athena. Within each XML file, there is an XML tag called ORDER_LINE. This tag is supposed to be an array of items, however in many files, there is only one item. XML does not have the concept of arrays, so when I pass this into my ETL job, Glue interprets the field as a Choice type in the schema, where it could either be an array or a struct type. I need to coerce this into an array type at all times. Here's a list of everything I've tried: 1. Using ResolveChoice to cast to an array. This doesn't work because a struct can't be casted to an array 2. Doing ResolveChoice to "make_struct", then the Map.apply() step to map the field where if "struct" has data, transform it to [struct]. This doesn't work and the Map docs hint that it does not support the python `map` function for arrays. 3. Converting the dynamic frame to a data frame, and then using pyspark withColumn(when(struct.isNotNull, [struct]).otherwise(array)) functions to convert the struct to an array, or make the array the main object, depending on which one is not null. This doesn't work because Glue is inferring the schema in the structs, and the fields in the structs are in a different order, so while all the fields in the schema are the same, Spark can't combine the result because the schema is not exactly the same. 4. Converting to data frame, then using a pyspark UDF to transform the data. This worked on a small dev sample set, but failed on the production dataset. The error message was extremely cryptic and I wasn't able to find the cause. Maybe this could work but I wasn't able to fully understand how to operate on the data in pyspark. 5. Trying to use the "withSchema" format_option when creating the dynamic frame from XML. The intention is to define the schema beforehand, but running this gives an error: ``` com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.LinkedHashMap<java.lang.Object,java.lang.Object>` out of VALUE_TRUE token at [Source: (String)" [...] (through reference chain: com.amazonaws.services.glue.schema.types.StructType["fields"]->java.util.ArrayList[0]->com.amazonaws.services.glue.schema.types.Field["properties"]) ``` So my question is, how do I make the XML data source for Glue interpret a tag as always an array, instead of a Choice, or how do I combine them? Even StackOverflow failed me here, and the forum post https://forums.aws.amazon.com/thread.jspa?messageID=931586&tstart=0 went unanswered. Here's a snippet of my pyspark code: ``` import sys import json from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.gluetypes import ( StructType, Field, StringType, IntegerType, ArrayType, ) from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions( sys.argv, [ "JOB_NAME", "source_bucket_name", "target_bucket_name", ], ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) source_bucket_name = args["source_bucket_name"] target_bucket_name = args["target_bucket_name"] schema = StructType( [ [fields removed as they are sensitive] Field( "ORDER_LINE", ArrayType( StructType( [ Field("FIELD1", IntegerType(), True), Field( "FIELD2", StructType([Field("CODE", StringType(), True)]), True, ), Field( "FIELD#", StructType([Field("CODE", StringType(), True)]), True, ), [fields removed] ] ) ), True, ), ] ) datasource0 = glueContext.create_dynamic_frame.from_options( "s3", {"paths": [f"s3://{source_bucket_name}"]}, format="xml", format_options={ "rowTag": "ORDER", "withSchema": json.dumps(schema.jsonValue()), }, transformation_ctx="datasource0", ) [more steps after this] ```
3
answers
0
votes
105
views
asked 7 months ago
  • 1
  • 90 / page