By using AWS re:Post, you agree to the Terms of Use

Questions tagged with Data Lakes

Sort by most recent

Browse through the questions and answers listed below or filter and sort to narrow down your results.

Hive Sync fails: AWSGlueDataCatalogHiveClientFactory not found

I'm syncing data written to S3 using Apache Hudi with Hive & Glue. Hudi options: ``` hudi_options: 'hoodie.table.name': mytable 'hoodie.datasource.write.recordkey.field': Id 'hoodie.datasource.write.partitionpath.field': date 'hoodie.datasource.write.table.name': mytable 'hoodie.datasource.write.operation': upsert 'hoodie.datasource.write.precombine.field': LastModifiedDate 'hoodie.datasource.hive_sync.enable': true 'hoodie.datasource.hive_sync.partition_fields': date 'hoodie.datasource.hive_sync.database': hudi_lake_dev 'hoodie.datasource.hive_sync.table': mytable ``` EMR Configurations: ``` ... { "Classification": "yarn-site", "Properties": { "yarn.nodemanager.vmem-check-enabled": "false", "yarn.log-aggregation-enable": "true", "yarn.log-aggregation.retain-seconds": "-1", "yarn.nodemanager.remote-app-log-dir": config[ "yarn_agg_log_uri_s3_path" ].format(current_date), }, "Configurations": [], }, { "Classification": "spark-hive-site", "Properties": { "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" # noqa }, }, { "Classification": "presto-connector-hive", "Properties": {"hive.metastore.glue.datacatalog.enabled": "true"}, }, { "Classification": "hive-site", "Properties": { "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", # noqa "hive.metastore.schema.verification": "false", }, }, ... ``` Getting the following error: ``` Traceback (most recent call last): File "/home/hadoop/my_pipeline.py", line 31, in <module> ... File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1109, in save File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o84.save. : org.apache.hudi.hive.HoodieHiveSyncException: Got runtime exception when hive syncing at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:74) at org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:391) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$4(HoodieSparkSqlWriter.scala:440) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$4$adapted(HoodieSparkSqlWriter.scala:436) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:436) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:497) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:223) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Failed to create HiveMetaStoreClient at org.apache.hudi.hive.HoodieHiveClient.<init>(HoodieHiveClient.java:93) at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:69) ... 46 more Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to instantiate a metastore client factory com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory due to: java.lang.ClassNotFoundException: Class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory not found) at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:239) at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:402) at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:335) at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:315) at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:291) at org.apache.hudi.hive.HoodieHiveClient.<init>(HoodieHiveClient.java:91) ... 47 more Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to instantiate a metastore client factory com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory due to: java.lang.ClassNotFoundException: Class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory not found) at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3991) at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:251) at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:234) ... 52 more Caused by: MetaException(message:Unable to instantiate a metastore client factory com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory due to: java.lang.ClassNotFoundException: Class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory not found) at org.apache.hadoop.hive.ql.metadata.HiveUtils.createMetaStoreClientFactory(HiveUtils.java:525) at org.apache.hadoop.hive.ql.metadata.HiveUtils.createMetaStoreClient(HiveUtils.java:506) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3746) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3726) at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3988) ... 54 more 22/01/12 16:37:53 INFO SparkContext: Invoking stop() from shutdown hook ```
1
answers
0
votes
913
views
asked 9 months ago

Best way to setup bucket with access points?

Hello, As part of a SaaS solution, I'm currently setting up the structure for a S3 bucket which will contian multiple clients' data. The idea is to use one access point per client, in order to isolate the different client's data. To be clear, the data is not made accessible to the client (not directly at least). The bucket is only used to absorb data to be used for processing and analysis purposes. This data is saved into different folders depending on the source type, so for example in a given access point one could have /images/ /logs/ etc. However, I'm unsure whether I should add extra partitioning to that, for a few reasons. For example, one is file collision. Suppose access point A has a file /images/tree.png, and then access point B tries to add a file with the same path, how is the collision handled? That could be solved with a something like hash suffix, but I'd still like to know what would happen. Then there is the question of scalability. This is not an issue per se, but I'm trying to think about what could happen in the future. It seems to me that having an extra partition on top of the access point would make it easier in the future, if there's any migration / refactoring that are needed. My solution would be to add the organisation id as prefix. Each access point would only have access (through the policy) to files in a specific subdirectory, like /12345/* However, this means that the callers to the access point need to add that prefix too, which is adds an extra step for all inputs pushing data to the access point, instead of using access point like it were a bucket directly. I'm not sure which way to go, if I'm complicating things or if there is a simpler solution, hence my question. Any advice would be greatly appreciated!
1
answers
1
votes
48
views
asked 10 months ago

XML interpret one struct as an array

I've been trying this for a week but I'm starting to give up - I need some help understanding this. I have an S3 bucket full of XML files, and I am creating a pyspark ETL job to convert them to Parquet so I can query them in Athena. Within each XML file, there is an XML tag called ORDER_LINE. This tag is supposed to be an array of items, however in many files, there is only one item. XML does not have the concept of arrays, so when I pass this into my ETL job, Glue interprets the field as a Choice type in the schema, where it could either be an array or a struct type. I need to coerce this into an array type at all times. Here's a list of everything I've tried: 1. Using ResolveChoice to cast to an array. This doesn't work because a struct can't be casted to an array 2. Doing ResolveChoice to "make_struct", then the Map.apply() step to map the field where if "struct" has data, transform it to [struct]. This doesn't work and the Map docs hint that it does not support the python `map` function for arrays. 3. Converting the dynamic frame to a data frame, and then using pyspark withColumn(when(struct.isNotNull, [struct]).otherwise(array)) functions to convert the struct to an array, or make the array the main object, depending on which one is not null. This doesn't work because Glue is inferring the schema in the structs, and the fields in the structs are in a different order, so while all the fields in the schema are the same, Spark can't combine the result because the schema is not exactly the same. 4. Converting to data frame, then using a pyspark UDF to transform the data. This worked on a small dev sample set, but failed on the production dataset. The error message was extremely cryptic and I wasn't able to find the cause. Maybe this could work but I wasn't able to fully understand how to operate on the data in pyspark. 5. Trying to use the "withSchema" format_option when creating the dynamic frame from XML. The intention is to define the schema beforehand, but running this gives an error: ``` com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.LinkedHashMap<java.lang.Object,java.lang.Object>` out of VALUE_TRUE token at [Source: (String)" [...] (through reference chain: com.amazonaws.services.glue.schema.types.StructType["fields"]->java.util.ArrayList[0]->com.amazonaws.services.glue.schema.types.Field["properties"]) ``` So my question is, how do I make the XML data source for Glue interpret a tag as always an array, instead of a Choice, or how do I combine them? Even StackOverflow failed me here, and the forum post https://forums.aws.amazon.com/thread.jspa?messageID=931586&tstart=0 went unanswered. Here's a snippet of my pyspark code: ``` import sys import json from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.gluetypes import ( StructType, Field, StringType, IntegerType, ArrayType, ) from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions( sys.argv, [ "JOB_NAME", "source_bucket_name", "target_bucket_name", ], ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) source_bucket_name = args["source_bucket_name"] target_bucket_name = args["target_bucket_name"] schema = StructType( [ [fields removed as they are sensitive] Field( "ORDER_LINE", ArrayType( StructType( [ Field("FIELD1", IntegerType(), True), Field( "FIELD2", StructType([Field("CODE", StringType(), True)]), True, ), Field( "FIELD#", StructType([Field("CODE", StringType(), True)]), True, ), [fields removed] ] ) ), True, ), ] ) datasource0 = glueContext.create_dynamic_frame.from_options( "s3", {"paths": [f"s3://{source_bucket_name}"]}, format="xml", format_options={ "rowTag": "ORDER", "withSchema": json.dumps(schema.jsonValue()), }, transformation_ctx="datasource0", ) [more steps after this] ```
3
answers
0
votes
259
views
asked 10 months ago