By using AWS re:Post, you agree to the Terms of Use
/AWS Glue/

Questions tagged with AWS Glue

Sort by most recent
  • 1
  • 90 / page

Browse through the questions and answers listed below or filter and sort to narrow down your results.

Glue ETL Job not working with error: o122.relationalize. com.amazonaws.services.glue.util.HadoopDataSourceJobBookmarkState cannot be cast to com.amazonaws.services.glue.util.RelationalizeJobBookmarkSt

Hello, I have an etl job that converts json -> parquet and was working up until I enabled job bookmarking via cloudformation. Now when my job runs I get the following error: ``` An error occurred while calling o122.relationalize. com.amazonaws.services.glue.util.HadoopDataSourceJobBookmarkState cannot be cast to com.amazonaws.services.glue.util.RelationalizeJobBookmarkState ``` # cloudformation.yml ``` AnalyticsGlueJob: Type: 'AWS::Glue::Job' Properties: Role: !Ref AnalyticsGlueRole Command: Name: 'glueetl' ScriptLocation: !Sub 's3://${AnalyticsS3Bucket}/analytics_etl.py' GlueVersion: '3.0' DefaultArguments: '--connection_type': 's3' '--db_name': !Ref AnalyticsGlueDatabase '--enable-metrics': '' '--job-bookmark-option': 'job-bookmark-enable' '--s3_dest': !Sub 's3://${AnalyticsS3ParquetBucket}/logs/' '--table_name': 'logs' '--temp_dir': !Sub 's3://${AnalyticsS3ParquetBucket}/temp/' ``` # etl_job.py ``` sc = SparkContext() gc = GlueContext(sc) spark = gc.spark_session job = Job(gc) job.init(args['JOB_NAME'], args) dyf = gc.create_dynamic_frame.from_catalog( database=DATABASE, table_name=TABLE_NAME, transformation_ctx='bookmark_ctx', ) # transform certain field types. dyf = ApplyMapping.apply( frame=dyf, mappings=[ ('authorizer.error', 'string', 'authorizer.error', 'string'), ('authorizer.latency', 'string', 'authorizer.latency', 'int'), ('authorizer.principal', 'string', 'authorizer.principal', 'string'), ('authorizer.requestId', 'string', 'authorizer.requestId', 'string'), ('authorizer.status', 'string', 'authorizer.status', 'int'), ('caller', 'string', 'caller', 'string'), ('httpmethod', 'string', 'httpmethod', 'string'), ('ip', 'string', 'ip', 'string'), ('partition_0', 'string', 'partition_0', 'string'), ('partition_1', 'string', 'partition_1', 'string'), ('partition_2', 'string', 'partition_2', 'string'), ('partition_3', 'string', 'partition_3', 'string'), ('path', 'string', 'path', 'string'), ('protocol', 'string', 'protocol', 'string'), ('requestid', 'string', 'requestid', 'string'), ('requesttime', 'string', 'requesttime', 'timestamp'), ('responselength', 'string', 'responselength', 'int'), ('status', 'string', 'status', 'int'), ('user', 'string', 'user', 'string'), ('useragent', 'string', 'useragent', 'string'), ], transformation_ctx='applymapping_ctx', ) # flatten nested json. dyf = Relationalize.apply( frame=dyf, staging_path=TEMP_DIR, name='root', transformation_ctx='relationalize_ctx', ) dyf = dyf.select('root') # write partitioned parquet files. dyf = gc.write_dynamic_frame.from_options( frame=dyf, connection_type=CONNECTION_TYPE, connection_options={ 'path': S3_DEST, 'partitionKeys': [ 'partition_0', 'partition_1', 'partition_2', 'partition_3', ], }, format='glueparquet', transformation_ctx='parquet_ctx', ) job.commit() ``` The error mentions `Relationalize` which I'm using to flatten a nested json structure, but I'm not sure why it's failing? Any help appreciated! ## edit I think I may have it working but I'm noticing that if the job runs without any new data in the source bucket, it will fail and throw an error rather than show as succeeded. ``` IllegalArgumentException: Partition column partition_0 not found in schema ``` When new records arrive in the source bucket and the job runs again, it is working and showing succeeded... Still interested in feedback!
1
answers
0
votes
3
views
borg
asked 3 days ago

S3 event notification Glue Crawler fails with Internal Service Exception

We are using Glue Crawler and switched to the S3 event notification 2 month or so ago. It has been failing with Internal Service exception fairly regularly so we had to switch back to scan all folders, run the crawler that way and then switch back to S3 event notification again. Previously problem seemed to happen when there were many events in SQS (40k+). S3 event based would work ok on the smaller amounts. Since the start of the 2022 year, event based crawler has been constantly failing with Internal Service Exception. Even after successfully running the all folder scan and switching back to the event based with just over 1k of events, it is still failing. No other errors provided in the log. The S3 full folder scan Crawler runs successfully however it takes 6+ hours to complete which is not acceptable for us. I did check https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/ for this issue however none of the mentioned conditions exist in our case and as I mentioned the full scan crawler runs without issues. Log: ``` INFO : Crawler configured with Configuration { "Version": 1, "CrawlerOutput": { "Partitions": { "AddOrUpdateBehavior": "InheritFromTable" } }, "Grouping": { "TableGroupingPolicy": "CombineCompatibleSchemas", "TableLevelConfiguration": 2 } } and SchemaChangePolicy { "UpdateBehavior": "UPDATE_IN_DATABASE", "DeleteBehavior": "DEPRECATE_IN_DATABASE" } ERROR : Internal Service Exception ``` Any help would be appreciated.
2
answers
0
votes
7
views
AWS-User-4658641
asked 4 days ago

Hive Sync fails: AWSGlueDataCatalogHiveClientFactory not found

I'm syncing data written to S3 using Apache Hudi with Hive & Glue. Hudi options: ``` hudi_options: 'hoodie.table.name': mytable 'hoodie.datasource.write.recordkey.field': Id 'hoodie.datasource.write.partitionpath.field': date 'hoodie.datasource.write.table.name': mytable 'hoodie.datasource.write.operation': upsert 'hoodie.datasource.write.precombine.field': LastModifiedDate 'hoodie.datasource.hive_sync.enable': true 'hoodie.datasource.hive_sync.partition_fields': date 'hoodie.datasource.hive_sync.database': hudi_lake_dev 'hoodie.datasource.hive_sync.table': mytable ``` EMR Configurations: ``` ... { "Classification": "yarn-site", "Properties": { "yarn.nodemanager.vmem-check-enabled": "false", "yarn.log-aggregation-enable": "true", "yarn.log-aggregation.retain-seconds": "-1", "yarn.nodemanager.remote-app-log-dir": config[ "yarn_agg_log_uri_s3_path" ].format(current_date), }, "Configurations": [], }, { "Classification": "spark-hive-site", "Properties": { "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" # noqa }, }, { "Classification": "presto-connector-hive", "Properties": {"hive.metastore.glue.datacatalog.enabled": "true"}, }, { "Classification": "hive-site", "Properties": { "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", # noqa "hive.metastore.schema.verification": "false", }, }, ... ``` Getting the following error: ``` Traceback (most recent call last): File "/home/hadoop/my_pipeline.py", line 31, in <module> ... File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1109, in save File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o84.save. : org.apache.hudi.hive.HoodieHiveSyncException: Got runtime exception when hive syncing at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:74) at org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:391) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$4(HoodieSparkSqlWriter.scala:440) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$4$adapted(HoodieSparkSqlWriter.scala:436) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:436) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:497) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:223) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Failed to create HiveMetaStoreClient at org.apache.hudi.hive.HoodieHiveClient.<init>(HoodieHiveClient.java:93) at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:69) ... 46 more Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to instantiate a metastore client factory com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory due to: java.lang.ClassNotFoundException: Class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory not found) at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:239) at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:402) at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:335) at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:315) at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:291) at org.apache.hudi.hive.HoodieHiveClient.<init>(HoodieHiveClient.java:91) ... 47 more Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to instantiate a metastore client factory com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory due to: java.lang.ClassNotFoundException: Class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory not found) at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3991) at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:251) at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:234) ... 52 more Caused by: MetaException(message:Unable to instantiate a metastore client factory com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory due to: java.lang.ClassNotFoundException: Class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory not found) at org.apache.hadoop.hive.ql.metadata.HiveUtils.createMetaStoreClientFactory(HiveUtils.java:525) at org.apache.hadoop.hive.ql.metadata.HiveUtils.createMetaStoreClient(HiveUtils.java:506) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3746) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3726) at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3988) ... 54 more 22/01/12 16:37:53 INFO SparkContext: Invoking stop() from shutdown hook ```
1
answers
0
votes
7
views
spree
asked 4 days ago
0
answers
0
votes
0
views
AWS-User-4380568
asked 4 days ago
1
answers
0
votes
6
views
Nihal-Rainu
asked 7 days ago

AWS Glue Job Redshift : Issue Reading Timestamp with timezone from Redshift in AWS Glue Jobs

I have an AWS Glue Job Setup which will read data from AWS Redshift using JDBC Connection. * Column Value From DBeaver : 2020-05-08 12:36:53.000 +0530 * Column Value From RedShift Query Editor : 2020-05-08 07:06:53+00 **Data Type in Redshift** : timestamp with time zone **Data Type in AWS Glue Catalog Table** : timestamp I have written a crawler job which is mapping that value to timestamp but when I am trying to process the AWS Glue Job I am getting below exception. I tried various. The code works fine I read the value from CSV instead of Redshift table. while reading from CSV the datatype being mapped by crawler job is string. I tried changed the glue catalog table to string but thats not working too. This Guy also faced the similar issue but thats post is quite old without working solution https://github.com/databricks/spark-redshift/issues/391 Option 1 : Converting to string medicare_res_cast = medicare_dyf.resolveChoice(specs = [('updated_at','cast:string'),('created_at','cast:string')]) Option 2 : to_timestamp: df.withColumn("updated_at",to_timestamp("updated_at")).show(truncate=False) Option 2 : split : df.withColumn("updated_at", split(col("updated_at"), "+").getItem(0)).show() ``` ERROR ProcessLauncher: Error from Python:Traceback (most recent call last): File "/tmp/etl-custom-redshift", line 36, in <module> medicare_df.withColumn("updated_at", split(col("updated_at"), "+").getItem(0)).show() File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 378, in show print(self._jdf.showString(n, 20, vertical)) File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o92.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.31.27.144, executor 1): java.lang.NumberFormatException: For input string: "53+00" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at java.sql.Timestamp.valueOf(Timestamp.java:259) at com.databricks.spark.redshift.Conversions$$anonfun$1$$anonfun$apply$11.apply(Conversions.scala:108) at com.databricks.spark.redshift.Conversions$$anonfun$1$$anonfun$apply$11.apply(Conversions.scala:108) at com.databricks.spark.redshift.Conversions$$anonfun$createRowConverter$1.apply(Conversions.scala:120) at com.databricks.spark.redshift.Conversions$$anonfun$createRowConverter$1.apply(Conversions.scala:116) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:104) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:295) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:266) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ... 1 more ```
0
answers
0
votes
6
views
AWS-User-6422649
asked 21 days ago

Error in Data Glue. Trying to do the BYOD lab

Hello I am new to working with this product and have no idea what this error message means. Would anyone be able to help me. Or direct me to where I can figure out what is going on. TIA Dec 24, 2021, 11:32:42 AM Pending execution 2021-12-24 16:33:07,324 Thread-4 WARN JNDI lookup class is not available because this JRE does not support JNDI. JNDI string lookups will not be available, continuing configuration. java.lang.ClassNotFoundException: org.apache.logging.log4j.core.lookup.JndiLookup at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.logging.log4j.util.LoaderUtil.loadClass(LoaderUtil.java:173) at org.apache.logging.log4j.util.LoaderUtil.newInstanceOf(LoaderUtil.java:211) at org.apache.logging.log4j.util.LoaderUtil.newCheckedInstanceOf(LoaderUtil.java:232) at org.apache.logging.log4j.core.util.Loader.newCheckedInstanceOf(Loader.java:301) at org.apache.logging.log4j.core.lookup.Interpolator.<init>(Interpolator.java:95) at org.apache.logging.log4j.core.config.AbstractConfiguration.<init>(AbstractConfiguration.java:114) at org.apache.logging.log4j.core.config.DefaultConfiguration.<init>(DefaultConfiguration.java:55) at org.apache.logging.log4j.core.layout.PatternLayout$Builder.build(PatternLayout.java:430) at org.apache.logging.log4j.core.layout.PatternLayout.createDefaultLayout(PatternLayout.java:324) at org.apache.logging.log4j.core.appender.ConsoleAppender$Builder.<init>(ConsoleAppender.java:121) at org.apache.logging.log4j.core.appender.ConsoleAppender.newBuilder(ConsoleAppender.java:111) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.createBuilder(PluginBuilder.java:158) at org.apache.logging.log4j.core.config.plugins.util.PluginBuilder.build(PluginBuilder.java:119) at org.apache.logging.log4j.core.config.AbstractConfiguration.createPluginObject(AbstractConfiguration.java:813) at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:753) at org.apache.logging.log4j.core.config.AbstractConfiguration.createConfiguration(AbstractConfiguration.java:745) at org.apache.logging.log4j.core.config.AbstractConfiguration.doConfigure(AbstractConfiguration.java:389) at org.apache.logging.log4j.core.config.AbstractConfiguration.initialize(AbstractConfiguration.java:169) at org.apache.logging.log4j.core.config.AbstractConfiguration.start(AbstractConfiguration.java:181) at org.apache.logging.log4j.core.LoggerContext.setConfiguration(LoggerContext.java:446) at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:520) at org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:536) at org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:214) at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:146) at org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:41) at org.apache.logging.log4j.LogManager.getContext(LogManager.java:194) at org.apache.logging.log4j.LogManager.getLogger(LogManager.java:597) at org.apache.spark.metrics.sink.MetricsConfigUtils.<clinit>(MetricsConfigUtils.java:12) at org.apache.spark.metrics.sink.MetricsProxyInfo.fromConfig(MetricsProxyInfo.java:17) at org.apache.spark.metrics.sink.GlueCloudwatchSink.<init>(GlueCloudwatchSink.scala:51) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:200) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194) at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102) at org.apache.spark.SparkContext.<init>(SparkContext.scala:514) at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:238) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) at py4j.GatewayConnection.run(GatewayConnection.java:238)
0
answers
0
votes
2
views
AWS-User-Lynn
asked 23 days ago

Glue job hudi-init-load-job with script HudiInitLoadNYTaxiData.py fails

Hello. WE have some sort of POC and currently evaluating capability of Glue. As a part of that evaluation I've recently activated the latest version [of "AWS Glue Connector for Apache Hudi" which is 0.9.0](https://aws.amazon.com/marketplace/pp/prodview-zv3vmwbkuat2e?ref_=beagle&applicationId=GlueStudio) . To be precise I speak about the results of implementation of the steps from the [article] (https://aws.amazon.com/blogs/big-data/writing-to-apache-hudi-tables-using-aws-glue-connector/) . We currently don't use AWS Lake Formation. So I've successfully implemented every step except the part related to AWS Lake Formation. Once I got the work of CloudFormation successfully accomplished I kicked off hudi-init-load-job JOB. But a result was somewhat frustrating! The job failed with the following result: 2021-12-24 08:50:56,249 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last): File "/tmp/HudiInitLoadNYTaxiData.py", line 27, in <module> glueContext.write_dynamic_frame.from_options(frame = DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type = "marketplace.spark", connection_options = combinedConf) File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 653, in from_options format_options, transformation_ctx) File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 279, in write_dynamic_frame_from_options format, format_options, transformation_ctx) File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 302, in write_from_options return sink.write(frame_or_dfc) File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write return self.writeFrame(dynamic_frame_or_dfc, info) File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors") File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in _call_ answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o115.pyWriteDynamicFrame. : java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)[Ljava/lang/Object; at org.apache.hudi.DataSourceOptionsHelper$.$anonfun$allAlternatives$1(DataSourceOptions.scala:749) at org.apache.hudi.DataSourceOptionsHelper$.$anonfun$allAlternatives$1$adapted(DataSourceOptions.scala:749) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.hudi.DataSourceOptionsHelper$.<init>(DataSourceOptions.scala:749) at org.apache.hudi.DataSourceOptionsHelper$.<clinit>(DataSourceOptions.scala) at org.apache.hudi.HoodieWriterUtils$.parametersWithWriteDefaults(HoodieWriterUtils.scala:80) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:157) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) at com.amazonaws.services.glue.marketplace.connector.SparkCustomDataSink.writeDynamicFrame(CustomDataSink.scala:43) at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:65) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) I'm quite new to the AWS stack so could someone please give me a hand to fix it
6
answers
0
votes
4
views
AWS-User-0316168
asked 23 days ago

inject non quoted csv file into RDS via glue

I have a pyspark script generated by my glue job that aims to read data from a CSV file in an S3 bucket and write it on my SQL RDS table. in my CSV file, I have string multi-lines. if the strings is quoted the job pass, but in my case, multi-line strings are not quoted so the job cannot insert data in my table; I tried : `spark.read.option("multiLine", "true").option("quoteChar", -1).option("header","true")` it doesn't work. I also tried : ``` datasink5 = glueContext.write_dynamic_frame.from_options( frame = dynamic_frame_write, connection_type = "s3", connection_options = { "path": "s3://mycsvFile" }, format = "csv", format_options={ "quoteChar": -1, "separator": "," }, transformation_ctx = "datasink5") ``` but this wrote the data back to s3 not to my RDS table. this is my glue job : ``` import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from awsglue.job import Job import pyspark.sql.functions as f ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session ## spark.read.option("multiLine", "true").option("quoteChar", -1).option("header","true").option("escape","\'") job = Job(glueContext) job.init(args['JOB_NAME'], args) def otherTreatment(dfa): ... return dfa datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db_rds", table_name = "tbl_csv_extract", transformation_ctx = "datasource0") applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "string", "id", "string"), ("created", "string", "created", "timestamp"), ("name", "string", "name", "string"), ("high", "string", "high", "decimal(22,7)")], transformation_ctx = "applymapping1") selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["created", "name", "high", "id"], transformation_ctx = "selectfields2") resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "db_rds_sql", table_name = "tbl_teststring", transformation_ctx = "resolvechoice3") resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4") data_frame = resolvechoice4.toDF() data_frame = otherTreatment(data_frame) dynamic_frame_write = DynamicFrame.fromDF(data_frame, glueContext, "dynamic_frame_write") datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = dynamic_frame_write, database = "db_rds_sql", table_name = "tbl_teststring", transformation_ctx = "datasink5") ## with the flowing script write output back to s3 not in my sql table datasink5 = glueContext.write_dynamic_frame.from_options( frame = dynamic_frame_write, connection_type = "s3", connection_options = { "path": "s3://mycsvFile" }, format = "csv", format_options={ "quoteChar": -1, "separator": "," }, transformation_ctx = "datasink5") job.commit() ``` does anyone have any idea how can I write My CSV file with non quoted multiline with glue pyspark?
1
answers
0
votes
5
views
AWS-User-5483303
asked a month ago

XML interpret one struct as an array

I've been trying this for a week but I'm starting to give up - I need some help understanding this. I have an S3 bucket full of XML files, and I am creating a pyspark ETL job to convert them to Parquet so I can query them in Athena. Within each XML file, there is an XML tag called ORDER_LINE. This tag is supposed to be an array of items, however in many files, there is only one item. XML does not have the concept of arrays, so when I pass this into my ETL job, Glue interprets the field as a Choice type in the schema, where it could either be an array or a struct type. I need to coerce this into an array type at all times. Here's a list of everything I've tried: 1. Using ResolveChoice to cast to an array. This doesn't work because a struct can't be casted to an array 2. Doing ResolveChoice to "make_struct", then the Map.apply() step to map the field where if "struct" has data, transform it to [struct]. This doesn't work and the Map docs hint that it does not support the python `map` function for arrays. 3. Converting the dynamic frame to a data frame, and then using pyspark withColumn(when(struct.isNotNull, [struct]).otherwise(array)) functions to convert the struct to an array, or make the array the main object, depending on which one is not null. This doesn't work because Glue is inferring the schema in the structs, and the fields in the structs are in a different order, so while all the fields in the schema are the same, Spark can't combine the result because the schema is not exactly the same. 4. Converting to data frame, then using a pyspark UDF to transform the data. This worked on a small dev sample set, but failed on the production dataset. The error message was extremely cryptic and I wasn't able to find the cause. Maybe this could work but I wasn't able to fully understand how to operate on the data in pyspark. 5. Trying to use the "withSchema" format_option when creating the dynamic frame from XML. The intention is to define the schema beforehand, but running this gives an error: ``` com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.LinkedHashMap<java.lang.Object,java.lang.Object>` out of VALUE_TRUE token at [Source: (String)" [...] (through reference chain: com.amazonaws.services.glue.schema.types.StructType["fields"]->java.util.ArrayList[0]->com.amazonaws.services.glue.schema.types.Field["properties"]) ``` So my question is, how do I make the XML data source for Glue interpret a tag as always an array, instead of a Choice, or how do I combine them? Even StackOverflow failed me here, and the forum post https://forums.aws.amazon.com/thread.jspa?messageID=931586&tstart=0 went unanswered. Here's a snippet of my pyspark code: ``` import sys import json from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.gluetypes import ( StructType, Field, StringType, IntegerType, ArrayType, ) from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions( sys.argv, [ "JOB_NAME", "source_bucket_name", "target_bucket_name", ], ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) source_bucket_name = args["source_bucket_name"] target_bucket_name = args["target_bucket_name"] schema = StructType( [ [fields removed as they are sensitive] Field( "ORDER_LINE", ArrayType( StructType( [ Field("FIELD1", IntegerType(), True), Field( "FIELD2", StructType([Field("CODE", StringType(), True)]), True, ), Field( "FIELD#", StructType([Field("CODE", StringType(), True)]), True, ), [fields removed] ] ) ), True, ), ] ) datasource0 = glueContext.create_dynamic_frame.from_options( "s3", {"paths": [f"s3://{source_bucket_name}"]}, format="xml", format_options={ "rowTag": "ORDER", "withSchema": json.dumps(schema.jsonValue()), }, transformation_ctx="datasource0", ) [more steps after this] ```
3
answers
0
votes
5
views
Eelviny
asked a month ago
3
answers
0
votes
20
views
Alexander
asked a month ago

Create an Athena-queryable CloudTrail with CDK (or CloudFormation?)

I'm trying to create an app/stack/solution which, when deployed, sets up the necessary infrastructure to programmatically query CloudTrail logs: In particular, to find resource creation requests in some services by a given execution role. It seemed (e.g. from this [Querying CloudTrail Logs page](https://docs.aws.amazon.com/athena/latest/ug/cloudtrail-logs.html) in the Athena developer guide) like Athena would be a good solution here, but I'm struggling to get the setup automated properly. Setting up the [Trail](https://docs.aws.amazon.com/cdk/api/latest/docs/aws-cloudtrail-readme.html#trail) is pretty straightforward. However, my current attempt at mapping the [Athena manual partitioning instructions](https://docs.aws.amazon.com/athena/latest/ug/cloudtrail-logs.html#create-cloudtrail-table) to CDK generating a Glue table, seems to come up with a table with 0 partitions... And I don't really understand how the [partition projection instructions](https://docs.aws.amazon.com/athena/latest/ug/cloudtrail-logs.html#create-cloudtrail-table-partition-projection) could translate to CDK? There are definitely CloudTrail events in the source bucket/prefix - does anybody know how to make this work? I'm not that deep on either Glue or Athena yet. Current draft CDK for the Glue table below: ```typescript const cloudTrailTable = new glue.Table(this, "CloudTrailGlueTable", { columns: [ { name: "eventversion", type: glue.Schema.STRING }, { name: "useridentity", type: glue.Schema.struct([ { name: "type", type: glue.Schema.STRING }, { name: "principalid", type: glue.Schema.STRING }, { name: "arn", type: glue.Schema.STRING }, { name: "accountid", type: glue.Schema.STRING }, { name: "invokedby", type: glue.Schema.STRING }, { name: "accesskeyid", type: glue.Schema.STRING }, { name: "userName", type: glue.Schema.STRING }, { name: "sessioncontext", type: glue.Schema.struct([ { name: "attributes", type: glue.Schema.struct([ { name: "mfaauthenticated", type: glue.Schema.STRING }, { name: "creationdate", type: glue.Schema.STRING }, ]), }, { name: "sessionissuer", type: glue.Schema.struct([ { name: "type", type: glue.Schema.STRING }, { name: "principalId", type: glue.Schema.STRING }, { name: "arn", type: glue.Schema.STRING }, { name: "accountId", type: glue.Schema.STRING }, { name: "userName", type: glue.Schema.STRING }, ]), }, ]), }, ]), }, { name: "eventtime", type: glue.Schema.STRING }, { name: "eventsource", type: glue.Schema.STRING }, { name: "eventname", type: glue.Schema.STRING }, { name: "awsregion", type: glue.Schema.STRING }, { name: "sourceipaddress", type: glue.Schema.STRING }, { name: "useragent", type: glue.Schema.STRING }, { name: "errorcode", type: glue.Schema.STRING }, { name: "errormessage", type: glue.Schema.STRING }, { name: "requestparameters", type: glue.Schema.STRING }, { name: "responseelements", type: glue.Schema.STRING }, { name: "additionaleventdata", type: glue.Schema.STRING }, { name: "requestid", type: glue.Schema.STRING }, { name: "eventid", type: glue.Schema.STRING }, { name: "resources", type: glue.Schema.array( glue.Schema.struct([ { name: "ARN", type: glue.Schema.STRING }, { name: "accountId", type: glue.Schema.STRING }, { name: "type", type: glue.Schema.STRING }, ]) ), }, { name: "eventtype", type: glue.Schema.STRING }, { name: "apiversion", type: glue.Schema.STRING }, { name: "readonly", type: glue.Schema.STRING }, { name: "recipientaccountid", type: glue.Schema.STRING }, { name: "serviceeventdetails", type: glue.Schema.STRING }, { name: "sharedeventid", type: glue.Schema.STRING }, { name: "vpcendpointid", type: glue.Schema.STRING }, ], dataFormat: glue.DataFormat.CLOUDTRAIL_LOGS, database: myGlueDatabase, tableName: "cloudtrail_table", bucket: myCloudTrailBucket, description: "CloudTrail Glue table", s3Prefix: `AWSLogs/${cdk.Stack.of(this).account}/CloudTrail/`, partitionKeys: [ { name: "region", type: glue.Schema.STRING }, { name: "year", type: glue.Schema.STRING }, { name: "month", type: glue.Schema.STRING }, { name: "day", type: glue.Schema.STRING }, ], }); ```
1
answers
0
votes
17
views
EXPERT
Alex_T
asked a month ago

AWS Glue: DDB write_dynamic_frame_from_options fails with requests throttled

Hi, I have aws glue job written in Python that reads DDB table (cross accounts) and then attempts to write to another table in a current account. The code for writing DDB is very simple ``` def WriteTable(gc, dyf, tableName): gc.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={ "dynamodb.output.tableName": tableName, "dynamodb.throughput.write.percent": "1" } ) ``` I run it on a small test table with 20 mil items and 500MB in size. Avg item size is 28 bytes and below is the item sample. The table has primary key as a `pair` + sort key `date` ``` { "pair": { "S": "AMDANG" }, "date": { "N": "20080101" }, "value": { "N": "0.005845" } } ``` Read sink works just fine. Job reads items pretty fast with 3-5 workers and it takes a few minutes. However, the write sink is a disaster. The requests get throttled and it barely can write 100k records a minute. I tried already all Glue versions and 3-10 workers. It actually fails fast with retries exhausted. I had to change "dynamodb.output.retry" to 30-50 because default 10 just fails glue job as soon as it starts writing with: An error occurred while calling o70.pyWriteDynamicFrame. DynamoDB write exceeds max retry 10 [ddb metrics screenshot when glue writes](https://user-images.githubusercontent.com/62673610/144349675-431ce14c-e92d-4f9b-a78b-db71d15b2d3d.png) Lots of Write Throttle events. What could be the problem here? I thought it could be a primary key distribution problem, but they should be well distributed. I've looked at the `pair` distinct count, there are a few thousand pairs. The dates are daily sequences for multiple years. Ref: https://github.com/awslabs/aws-glue-libs/issues/104
2
answers
0
votes
27
views
catenary-dot-cloud
asked a month ago

[AI/ML] Data acquisition and preprocessing

Hi, Customer who loads the e-bike data to S3 wants to get AI/ML insight from sensor data. The e-bike sensor data are size about 4KB files each and posted in S3 buckets. The sensor data is put into format like this timestamp1, sensorA, sensorB, sensorC, ..., sensorZ timestamp2, sensorA, sensorB, sensorC, ..., sensorZ timestamp3, sensorA, sensorB, sensorC, ..., sensorZ ... Then these sensor data are put into one file about 4KB size. The plan I have is to * Read S3 objects * Parse S3 object with Lambda. I thought about Glue but wanted to put data in DynamoDB where Glue does not seem to support. Also, Glue seems to be more expensive. * Put the data in DynamoDB with bike ID as primary key and timestamp as sort key. * Use SageMaker to learn with the DynamoDB data. There will be separate discussion on choosing which model and making time-series inferencing. * If we need to re-learn, it will use the DynamoDB data, not from S3. I think it will be faster to get data from DynamoDB instead from the raw S3 data. * Also, I think we can filter out some bad input or apply little modification to DynamoDB data (shifting time stamps to the correct time, etc.) * Make inferencing output based on the model. What do you think? Would you agree? Would you approach the problem differently? Would you rather learn from S3 directly via Athena or direct S3 access? Or would you rather use Glue and Redshift? But the data about 100MB would be sufficient to train the model we have in mind. Glue and Redshift maybe overkill. Currently, Korea region does not support Timestream database. So, time series database closest in Korea could be DynamoDB. Please share your thoughts. Thanks!
1
answers
0
votes
1
views
AWS-User-6598922
asked a year ago

How to create a Glue Workflow programmatically?

Is there a way to create a Glue workflow programmatically? I looked at CloudFormation but the only one I found is to create an empty Workflow (just Workflow name, Description and properties). https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-glue-workflow.html I tried to look at the APIs as well (https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-workflow.html), and even if there are all the data type for all the structures the only create API is again only adding the blank box. Am I missing something? How do we create the workflow from the blueprint in Lake Formation? is some sort of pre-assembled JSON file that we just link to the workflow in glue? can you do something similar, or need to wait for the customizable blueprints? Thank you ***UPDATE:*** As it can be derived from the snippet of code from the Accepted Answer, the key is to consider that it is actually the : **AWS::Glue::Trigger** construct that helps you build the Workflow. Specifically, you need to: 1. create the Workflow with AWS::Glue::Workflow 2. If you need create Database and connection as well ( AWS::Glue::Database , AWS::Glue::Connection) 3. Create any Crawler and any Job you want to add to the workflow using : AWS::Glue::Crawler or AWS::Glue::Job 4. Create a first Trigger (AWS::Glue::Trigger ) with Type : ON-DEMAND , and Actions = to the firs Crawler or job your Workflow need to launch and Workflowname referencing the Workflow created at point 1 5. Create any other Trigger with Type : CONDITIONAL Below an Example (to create a Workflow that launch a Crawler on an S3 Bucket (cloudtraillogs) , if successfull launch a python script to change the table and partition schema to make them work with Athena )). hope this helps ``` --- AWSTemplateFormatVersion: '2010-09-09' Description: Creates cloudtrail crwaler and catalog for Athena and a job to transform to Parquet Parameters: CloudtrailS3: Type: String Description: Enter the unique bucket name where the cloud trails log are stored CloudtrailS3Path: Type: String Description: Enter the path/prefix that you want to crawl CloudtrailDataLakeS3: Type: String Description: Enter the unique bucket name for the data lake in which to store the logs in Parquet Format Resources: CloudTrailGlueExecutionRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - glue.amazonaws.com Action: - 'sts:AssumeRole' Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole GluePolicy: Properties: PolicyDocument: Version: '2012-10-17' Statement: - Action: - s3:GetBucketLocation - s3:GetObject - s3:PutObject - s3:ListBucket Effect: Allow Resource: - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailS3] ] - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailS3, '/*'] ] - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailDataLakeS3] ] - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailDataLakeS3, '/*'] ] - Action: - s3:DeleteObject Effect: Allow Resource: - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailDataLakeS3] ] - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailDataLakeS3, '/*'] ] PolicyName: glue_cloudtrail_S3_policy Roles: - Ref: CloudTrailGlueExecutionRole Type: AWS::IAM::Policy GlueWorkflow: Type: AWS::Glue::Workflow Properties: Description: Workflow to crawl the cloudtrail logs Name: cloudtrail_discovery_workflow GlueDatabaseCloudTrail: Type: AWS::Glue::Database Properties: # The database is created in the Data Catalog for your account CatalogId: !Ref AWS::AccountId DatabaseInput: # The name of the database is defined in the Parameters section above Name: cloudtrail_db Description: Database to hold tables for NY Philarmonica data LocationUri: !Ref CloudtrailDataLakeS3 GlueCrawlerCTSource: Type: AWS::Glue::Crawler Properties: Name: cloudtrail_source_crawler Role: !GetAtt CloudTrailGlueExecutionRole.Arn #Classifiers: none, use the default classifier Description: AWS Glue crawler to crawl cloudtrail logs Schedule: ScheduleExpression: 'cron(0 9 * * ? *)' DatabaseName: !Ref GlueDatabaseCloudTrail Targets: S3Targets: - Path: !Sub - s3://${bucket}/${path} - { bucket: !Ref CloudtrailS3, path : !Ref CloudtrailS3Path } Exclusions: - '*/CloudTrail-Digest/**' - '*/Config/**' #TablePrefix: '' SchemaChangePolicy: UpdateBehavior: "UPDATE_IN_DATABASE" DeleteBehavior: "LOG" Configuration: "{\"Version\":1.0,\"CrawlerOutput\":{\"Partitions\":{\"AddOrUpdateBehavior\":\"InheritFromTable\"},\"Tables\":{\"AddOrUpdateBehavior\":\"MergeNewColumns\"}}}" GlueJobConvertTable: Type: AWS::Glue::Job Properties: Name: ct_change_table_schema Role: Fn::GetAtt: [CloudTrailGlueExecutionRole, Arn] ExecutionProperty: MaxConcurrentRuns: 1 GlueVersion: 1.0 Command: Name: pythonshell PythonVersion: 3 ScriptLocation: !Sub - s3://${bucket}/python/ct_change_table_schema.py - {bucket: !Ref CloudtrailDataLakeS3} DefaultArguments: '--TempDir': !Sub - s3://${bucket}/glue_tmp/ - {bucket: !Ref CloudtrailDataLakeS3} "--job-bookmark-option" : "job-bookmark-disable" "--enable-metrics" : "" DependsOn: - CloudTrailGlueExecutionRole GlueSourceCrawlerTrigger: Type: AWS::Glue::Trigger Properties: Name: ct_start_source_crawl_Trigger Type: ON_DEMAND Description: Source Crawler trigger WorkflowName: !Ref GlueWorkflow Actions: - CrawlerName: Ref: GlueCrawlerCTSource DependsOn: - GlueCrawlerCTSource GlueJobTrigger: Type: AWS::Glue::Trigger Properties: Name: ct_change_schema_Job_Trigger Type: CONDITIONAL Description: Job trigger WorkflowName: !Ref GlueWorkflow StartOnCreation: 'true' Actions: - JobName: !Ref GlueJobConvertTable Predicate: Conditions: - LogicalOperator: EQUALS CrawlerName: !Ref GlueCrawlerCTSource CrawlState: SUCCEEDED Logical: ANY DependsOn: - GlueJobConvertTable
1
answers
0
votes
4
views
EXPERT
Fabrizio@AWS
asked 2 years ago

Glue etl job fails to write to Redshift using dynamic frame - reason ?

We are observing that writing to redshift using glue dynamic frame errors out when the input file >1GB. **Setup :** Redshift Cluster : 2 node DC2 **Glue job** temp_df = glueContext.create_dynamic_frame.from_options(connection_type="s3", format="csv", connection_options={"paths": [source]}, format_options={"withHeader": True, "separator": ","}, transformation_ctx="path={}".format(source)).toDF() redshift_df = DynamicFrame.fromDF(output_df, glueContext, "redshift_df") datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame=redshift_df, catalog_connection="pilot-rs", connection_options={"preactions": "truncate table tablename;", "dbtable": "tablename", "database": "dev"}, redshift_tmp_dir='s3://bucket/path/', transformation_ctx="datasink4") **Observation :** Code works when the input file is under 1GB. It is able to write to redshift table. Code fails when input file size is >1Gb and job run time is around 10 mins. **Error:** An error occurred while calling o260.save. Timeout waiting for connection from pool and sometimes “An error occurred while calling o334.pyWriteDynamicFrame. Timeout waiting for connection from pool" Portion of glue error log Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286) at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
1
answers
0
votes
2
views
EXPERT
Dipankar_G
asked 2 years ago

Can't install pyarrow on AWS Glue python shell

I want to import pyarrow in a Python shell Glue script because I need to export a dataframe as parquet (i.e. with DataFrame.to_parquet()). The way to add custom dependencies suggested in the AWS docs is to use .egg or .whl files (https://docs.aws.amazon.com/glue/latest/dg/add-job-python.html#create-python-extra-library). The library pyarrow has numpy and six as dependencies: - numpy is already pre-installed on Glue, with version 1.16.2 as I checked with a simple print(numpy.version.version) - six is not pre-installed so I downloaded six-1.14.0-py2.py3-none-any.whl from Pypi and uploaded it to S3. - pyarrow is not pre-installed so I downloaded from Pypi the wheel file pyarrow-0.16.0-cp36-cp36m-manylinux2014_x86_64.whl and uploaded it to S3. The minimal script is this: ``` import pandas as pd import six import numpy from pyarrow import * data = [[&#39;Alex&#39;,10],[&#39;Bob&#39;,12],[&#39;Clarke&#39;,13]] df = pd.DataFrame(data,columns=[&#39;Name&#39;,&#39;Age&#39;],dtype=float) df.to_parquet(&#39;test.parquet&#39;) ``` When I run the script adding as libraries the wheel files of six and pyarrow, I get the following message: ``` Processing ./glue-python-libs-f8nyy9el/six-1.14.0-py2.py3-none-any.whl Installing collected packages: six Successfully installed six-1.14.0 Processing ./glue-python-libs-f8nyy9el/pyarrow-0.16.0-cp36-cp36m-manylinux2014_x86_64.whl ``` and the following error: ``` WARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by &#39;ConnectTimeoutError(<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x7f7d78c96e10>, &#39;Connection to pypi.org timed out. (connect timeout=15)&#39;)&#39;: /simple/numpy/ WARNING: Retrying (Retry(total=3, connect=None, read=None, redirect=None, status=None)) after connection broken by &#39;ConnectTimeoutError(<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x7f7d78c96c88>, &#39;Connection to pypi.org timed out. (connect timeout=15)&#39;)&#39;: /simple/numpy/ WARNING: Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by &#39;ConnectTimeoutError(<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x7f7d78c96dd8>, &#39;Connection to pypi.org timed out. (connect timeout=15)&#39;)&#39;: /simple/numpy/ WARNING: Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by &#39;ConnectTimeoutError(<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x7f7d78c969b0>, &#39;Connection to pypi.org timed out. (connect timeout=15)&#39;)&#39;: /simple/numpy/ WARNING: Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by &#39;ConnectTimeoutError(<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x7f7d78c96898>, &#39;Connection to pypi.org timed out. (connect timeout=15)&#39;)&#39;: /simple/numpy/ ERROR: Could not find a version that satisfies the requirement numpy>=1.14 (from pyarrow==0.16.0) (from versions: none) ERROR: No matching distribution found for numpy>=1.14 (from pyarrow==0.16.0) Traceback (most recent call last): File "/tmp/runscript.py", line 112, in <module> download_and_install(args.extra_py_files) File "/tmp/runscript.py", line 62, in download_and_install subprocess.check_call([sys.executable, "-m", "pip", "install", "--target= {} ".format(install_path), local_file_path]) File "/usr/local/lib/python3.6/subprocess.py", line 311, in check_call raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command &#39;[&#39;/usr/local/bin/python&#39;, &#39;-m&#39;, &#39;pip&#39;, &#39;install&#39;, &#39;--target=/glue/lib/installation&#39;, &#39;/tmp/glue-python-libs-f8nyy9el/pyarrow-0.16.0-cp36-cp36m-manylinux2014_x86_64.whl&#39;]&#39; returned non-zero exit status 1. ``` So at first, it seems that six is installed correctly, but then it looks like the job does not realize that numpy is already present with a compatible version. Then I tried to upload to S3 also the wheel file s3://risultati-navigazione-wt-ga/libs/numpy-1.18.2-cp36-cp36m-manylinux1_x86_64.whl that I downloaded from Pypi. In this case I get the message: ``` Processing ./glue-python-libs-xzfdvgzd/numpy-1.18.2-cp36-cp36m-manylinux1_x86_64.whl Installing collected packages: numpy Successfully installed numpy-1.18.2 Processing ./glue-python-libs-xzfdvgzd/six-1.14.0-py2.py3-none-any.whl Installing collected packages: six Successfully installed six-1.14.0 Processing ./glue-python-libs-xzfdvgzd/pyarrow-0.16.0-cp36-cp36m-manylinux2014_x86_64.whl ``` and the error: ``` WARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by &#39;ConnectTimeoutError(<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x7fca02861cc0>, &#39;Connection to pypi.org timed out. (connect timeout=15)&#39;)&#39;: /simple/six/ WARNING: Retrying (Retry(total=3, connect=None, read=None, redirect=None, status=None)) after connection broken by &#39;ConnectTimeoutError(<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x7fca02861cc0>, &#39;Connection to pypi.org timed out. (connect timeout=15)&#39;)&#39;: /simple/six/ WARNING: Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by &#39;ConnectTimeoutError(<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x7fca02861cc0>, &#39;Connection to pypi.org timed out. (connect timeout=15)&#39;)&#39;: /simple/six/ WARNING: Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by &#39;ConnectTimeoutError(<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x7fca02861cc0>, &#39;Connection to pypi.org timed out. (connect timeout=15)&#39;)&#39;: /simple/six/ WARNING: Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by &#39;ConnectTimeoutError(<pip._vendor.urllib3.connection.VerifiedHTTPSConnection object at 0x7fca02861cc0>, &#39;Connection to pypi.org timed out. (connect timeout=15)&#39;)&#39;: /simple/six/ ERROR: Could not find a version that satisfies the requirement six>=1.0.0 (from pyarrow==0.16.0) (from versions: none) ERROR: No matching distribution found for six>=1.0.0 (from pyarrow==0.16.0) Traceback (most recent call last): File "/tmp/runscript.py", line 112, in <module> download_and_install(args.extra_py_files) File "/tmp/runscript.py", line 62, in download_and_install subprocess.check_call([sys.executable, "-m", "pip", "install", "--target= {} ".format(install_path), local_file_path]) File "/usr/local/lib/python3.6/subprocess.py", line 311, in check_call raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command &#39;[&#39;/usr/local/bin/python&#39;, &#39;-m&#39;, &#39;pip&#39;, &#39;install&#39;, &#39;--target=/glue/lib/installation&#39;, &#39;/tmp/glue-python-libs-xzfdvgzd/pyarrow-0.16.0-cp36-cp36m-manylinux2014_x86_64.whl&#39;]&#39; returned non-zero exit status 1. ``` so, this time, numpy is recognized during the installation of pyarrow but, as far I understand, althoughsix is installed correctly, for some reason pyarrow can&#39;t find it during the installation and indeed it tries to download from the Internet (it gets stuck a few minutes during that operation). Can anybody help me? Thanks!
1
answers
0
votes
3
views
ecanovi
asked 2 years ago

AWS Glue Dynamid Dataframe relationalize

I load json data and use relationalize method on dynamic dataframe to flatten the otherwise nested json object and saving it into parquet format. The problem is that once saved into parquet format for faster Athena queries, the column names contain dots, which is against the Athena sql query syntax and thus I am unable to make column specific queries. In order to tackle this problem I also rename the column names in the Glue job to exclude the dots and put underscores instead. My question is which approach of the two would be better and why? (Efficiency- memory? execution speed on nodes? etc.). Also given the horrible aws glue documentation I could not come up with dynamic frame only solution. I have problems getting the column names in dynamic fashion, thus I am utilizing toDF(). 1) First approach is around getting the column names from df extracted from dynamic df ``` relationalize1 = Relationalize.apply(frame=datasource0, transformation_ctx="relationalize1").select("roottable") df_relationalize1 = relationalize1.toDF() for field in df_relationalize1.schema.fields: relationalize1 = RenameField.apply(frame = relationalize1, old_name = "`"+field.name+"`", new_name = field.name.replace(".","_"), transformation_ctx = "renamefield_" + field.name) ``` 2) Second approach would be to extract the df from dynamic df and perform the rename field on the pyspark df (instead of dynamic df), to then convert back to dynamic df and save it in parquet format. Is there a better approach? Can a crawler rename columns? How fast is .fromDF() method? Is there a better documentation on functions and methods than the pdf developer guide?
3
answers
0
votes
1
views
cell
asked 2 years ago

Unable to read orc file using glue context or Spark

Hi, I am trying to read an orc file from an S3 bucket. I tried using Glue Catalog which gives the same schema as below. I am able to query the data via athena without any issues but when using Glue dynamic frame and catalog or spark dataframe to read or manipulate the orc file it is giving the below error. Using Spark 2.4 Glue Context code - datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "<database>", table_name = "<table>", transformation_ctx = "datasource0") datasource0_df = datasource0.toDF() datasource0_df = datasource0_df.drop("operation", "originaltransaction", "bucket", "rowid", "currenttransaction") Spark code - df = spark.read.format('orc').load('<S3 File path') df.printSchema() df.show() The printSchema shows the schema as follows - root |-- operation: integer (nullable = true) |-- originalTransaction: long (nullable = true) |-- bucket: integer (nullable = true) |-- rowId: long (nullable = true) |-- currentTransaction: long (nullable = true) |-- row: struct (nullable = true) | |-- _col0: string (nullable = true) | |-- _col1: string (nullable = true) | |-- _col2: string (nullable = true) | |-- _col3: timestamp (nullable = true) | |-- _col4: timestamp (nullable = true) | |-- _col5: string (nullable = true) | |-- _col6: string (nullable = true) | |-- _col7: boolean (nullable = true) | |-- _col8: timestamp (nullable = true) | |-- _col9: timestamp (nullable = true) | |-- _col10: string (nullable = true) | |-- _col11: string (nullable = true) | |-- _col12: timestamp (nullable = true) | |-- _col13: string (nullable = true) | |-- _col14: timestamp (nullable = true) | |-- _col15: string (nullable = true) | |-- _col16: string (nullable = true) | |-- _col17: string (nullable = true) | |-- _col18: string (nullable = true) | |-- _col19: string (nullable = true) | |-- _col20: string (nullable = true) | |-- _col21: timestamp (nullable = true) | |-- _col22: timestamp (nullable = true) | |-- _col23: string (nullable = true) | |-- _col24: string (nullable = true) | |-- _col25: string (nullable = true) | |-- _col26: string (nullable = true) | |-- _col27: string (nullable = true) | |-- _col28: string (nullable = true) | |-- _col29: string (nullable = true) | |-- _col30: string (nullable = true) | |-- _col31: string (nullable = true) | |-- _col32: integer (nullable = true) | |-- _col33: integer (nullable = true) | |-- _col34: integer (nullable = true) | |-- _col35: integer (nullable = true) | |-- _col36: integer (nullable = true) | |-- _col37: integer (nullable = true) However when I am trying to do any operation on the dataframe it is giving the below error. ERROR \[Executor task launch worker for task 0] executor.Executor (Logging.scala:logError(91)) - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.IllegalArgumentException: Include vector the wrong length: {"category": "struct", "id": 0, "max": 44, "fields": \[ "operation": {"category": "int", "id": 1, "max": 1}, "originalTransaction": {"category": "bigint", "id": 2, "max": 2}, "bucket": {"category": "int", "id": 3, "max": 3}, "rowId": {"category": "bigint", "id": 4, "max": 4}, "currentTransaction": {"category": "bigint", "id": 5, "max": 5}, "row": {"category": "struct", "id": 6, "max": 44, "fields": \[ "_col0": {"category": "string", "id": 7, "max": 7}, "_col1": {"category": "string", "id": 8, "max": 8}, "_col2": {"category": "string", "id": 9, "max": 9}, "_col3": {"category": "timestamp", "id": 10, "max": 10}, "_col4": {"category": "timestamp", "id": 11, "max": 11}, "_col5": {"category": "string", "id": 12, "max": 12}, "_col6": {"category": "string", "id": 13, "max": 13}, "_col7": {"category": "boolean", "id": 14, "max": 14}, "_col8": {"category": "timestamp", "id": 15, "max": 15}, "_col9": {"category": "timestamp", "id": 16, "max": 16}, "_col10": {"category": "string", "id": 17, "max": 17}, "_col11": {"category": "string", "id": 18, "max": 18}, "_col12": {"category": "timestamp", "id": 19, "max": 19}, "_col13": {"category": "string", "id": 20, "max": 20}, "_col14": {"category": "timestamp", "id": 21, "max": 21}, "_col15": {"category": "string", "id": 22, "max": 22}, "_col16": {"category": "string", "id": 23, "max": 23}, "_col17": {"category": "string", "id": 24, "max": 24}, "_col18": {"category": "string", "id": 25, "max": 25}, "_col19": {"category": "string", "id": 26, "max": 26}, "_col20": {"category": "string", "id": 27, "max": 27}, "_col21": {"category": "timestamp", "id": 28, "max": 28}, "_col22": {"category": "timestamp", "id": 29, "max": 29}, "_col23": {"category": "string", "id": 30, "max": 30}, "_col24": {"category": "string", "id": 31, "max": 31}, "_col25": {"category": "string", "id": 32, "max": 32}, "_col26": {"category": "string", "id": 33, "max": 33}, "_col27": {"category": "string", "id": 34, "max": 34}, "_col28": {"category": "string", "id": 35, "max": 35}, "_col29": {"category": "string", "id": 36, "max": 36}, "_col30": {"category": "string", "id": 37, "max": 37}, "_col31": {"category": "string", "id": 38, "max": 38}, "_col32": {"category": "int", "id": 39, "max": 39}, "_col33": {"category": "int", "id": 40, "max": 40}, "_col34": {"category": "int", "id": 41, "max": 41}, "_col35": {"category": "int", "id": 42, "max": 42}, "_col36": {"category": "int", "id": 43, "max": 43}, "_col37": {"category": "int", "id": 44, "max": 44}]}]} with include length 45 Please let me know what needs to be done to fix this. Edited by: amit94ece on Oct 23, 2019 8:48 PM Edited by: amit94ece on Oct 23, 2019 9:08 PM Edited by: amit94ece on Oct 24, 2019 1:15 AM
1
answers
0
votes
0
views
amit94ece
asked 2 years ago

Syntax error with Glue dynamic data frame

I tried to use glue ETL to convert nested json data to parquet. It works but since it does sampling, it couldn't determine data type of some fields and use struct for all possible value which changed the schema. I tried to use ResolveChoice to force it use one type instead of struct. but I keep getting a syntax error. I followed the doc but still cannot figure it out. Here is my code, could someone help? what's the right syntax? does it support nested data? resolvechoice2 = ResolveChoice.apply(frame = applymapping1, specs = [("in_reply_to_user_id", "project:long"),("user.id", "project:long"),("quoted_status.user.id", "project:long"),("entities.user_mentions.element.id", "project:long"),("entities.media.element.source_user_id", "project:long"),("retweeted_status.user.id", "project:long"),("extended_entities.media.element.source_user_id", "project:long")]), transformation_ctx = "resolvechoice2") Syntax Error: File "/tmp/g-7d4adc26f6e5bb15ba8d86e7b4fced4ba08ca29d-6847579850030189659/script_2019-04-30-06-11-30.py", line 30 resolvechoice2 = ResolveChoice.apply(frame = applymapping1, specs = [("in_reply_to_user_id", "project:long"),("user.id", "project:long"),("quoted_status.user.id", "project:long"),("entities.user_mentions.element.id", "project:long"),("entities.media.element.source_user_id", "project:long"),("retweeted_status.user.id", "project:long"),("extended_entities.media.element.source_user_id", "project:long")]), transformation_ctx = "resolvechoice2") SyntaxError: invalid syntax Thanks, Juan
1
answers
0
votes
1
views
MODERATOR
AWS-User-9812337
asked 3 years ago
  • 1
  • 90 / page