Questions tagged with Extract Transform & Load Data

Content language: English

Sort by most recent

Browse through the questions and answers listed below or filter and sort to narrow down your results.

AWS GLUE job changing from STANDARD to FLEX not working as expected

Hi, I'm having an issue with the new **FLEX** feature. In our company, we are trying to save costs when running GLUE Jobs (we are validating our product fit). The same day that FLEX was released we tried it. Since then I'm not able to make it work. I thought that only checking that Flex checkbox would suffice but I think I'm doing something wrong. The jobs are now running as before (without that checkbox checked) and they are running 100% OK. Simply put, we are reading from an RDS SQL Server Table, doing basic ETL processes, and storing it in an S3 bucket in CSV format. Also, I don't think there's an issue with Job timeout since it is set to 60 minutes and the job takes bearly a couple of minutes to fail The failed job status shows: * Glue version: 3.0 * Start-up time: 16 seconds * Execution time: 6 minutes 25 seconds * Timeout: 45 minutes * Worker type: G.1X * Number of workers: 10 * Execution class: FLEX * Max capacity: 10 DPUs The success job status is the same but: * Execution class: STANDARD In the job monitor we read: > An error occurred while calling o87.getDynamicFrame. Job 0 cancelled because SparkContext was shut down caused by threshold for executors failed after launch reached. Note: This run was executed with Flex execution. Check the logs if run failed due to executor termination. in cloudwatch logs part of the output error: `An error occurred while calling o90.getDynamicFrame.\n: org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1130)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1128)\n\tat scala.collection.mutable.HashSet.foreach(HashSet.scala:79)\n\tat org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1128)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2703)\n\tat org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)\n\tat org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2603)\n\tat org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2111)\n\tat org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1419)\n\tat org.apache.spark.SparkContext.stop(SparkContext.scala:2111)\n\tat org.apache.spark.SparkContext.$anonfun$new$39(SparkContext.scala:681)\n\tat org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)\n\tat org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)\n\tat org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat scala.util.Try$.apply(Try.scala:213)\n\tat org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)\n\tat org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)\n\tat org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:477)\n\tat org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:430)\n\tat org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)\n\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3733)\n\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2762)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3724)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:3722)\n\tat org.apache.spark.sql.Dataset.head(Dataset.scala:2762)\n\tat org.apache.spark.sql.Dataset.take(Dataset.scala:2969)\n\tat com.amazonaws.services.glue.JDBCDataSource.getLastRow(DataSource.scala:1089)\n\tat com.amazonaws.services.glue.JDBCDataSource.getJdbcJobBookmark(DataSource.scala:929)\n\tat com.amazonaws.services.glue.JDBCDataSource.getDynamicFrame(DataSource.scala:953)\n\tat com.amazonaws.services.glue.DataSource.getDynamicFrame(DataSource.scala:99)\n\tat com.amazonaws.services.glue.DataSource.getDynamicFrame$(DataSource.scala:99)\n\tat com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:714)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:750)\n","Stack Trace":[{"Declaring Class":"get_return_value","Method Name":"format(target_id, \".\", name), value)","File Name":"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py","Line Number":328},{"Declaring Class":"deco","Method Name":"return f(*a, **kw)","File Name":"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py","Line Number":111},{"Declaring Class":"__call__","Method Name":"answer, self.gateway_client, self.target_id, self.name)","File Name":"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py","Line Number":1305},{"Declaring Class":"getFrame","Method Name":"jframe = self._jsource.getDynamicFrame()","File Name":"/opt/amazon/lib/python3.6/site-packages/awsglue/data_source.py","Line Number":36},{"Declaring Class":"create_dynamic_frame_from_catalog","Method Name":"return source.getFrame(**kwargs)","File Name":"/opt/amazon/lib/python3.6/site-packages/awsglue/context.py","Line Number":185},{"Declaring Class":"from_catalog","Method Name":"return self._glue_context.create_dynamic_frame_from_catalog(db, table_name, redshift_tmp_dir, transformation_ctx, push_down_predicate, additional_options, catalog_id, **kwargs)","File Name":"/opt/amazon/lib/python3.6/site-packages/awsgl` Any help would be much appreciated, Agustin.
2
answers
0
votes
73
views
asked 3 months ago

An error occurred while calling o79.getDynamicFrame. java.lang.reflect.InvocationTargetException

I'm getting the error in the title while running a Glue Job to transform a CSV file to Parquet format. The code is below (unedited from the AWS recommendation): ``` import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "appdata", table_name = "payment_csv", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "appdata", table_name = "payment_csv", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("_id", "string", "_id", "string"), ("amountpaidcents", "long", "amountpaidcents", "long"), ("_p_establishment", "string", "_p_establishment", "string"), ("type", "string", "type", "string"), ("ispaid", "boolean", "ispaid", "boolean"), ("for", "string", "for", "string"), ("_created_at", "string", "_created_at", "string"), ("_updated_at", "string", "_updated_at", "string"), ("formailingorder", "string", "formailingorder", "string"), ("formailinglistorder", "string", "formailinglistorder", "string"), ("_p_user", "string", "_p_user", "string"), ("`spcharge.created`", "long", "`spcharge.created`", "long"), ("`spcharge.data.object.billing_details.address.postal_code`", "long", "`spcharge.data.object.billing_details.address.postal_code`", "long"), ("`spcharge.data.object.payment_method_details.card.brand`", "string", "`spcharge.data.object.payment_method_details.card.brand`", "string"), ("`spcharge.data.object.payment_method_details.card.funding`", "string", "`spcharge.data.object.payment_method_details.card.funding`", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("_id", "string", "_id", "string"), ("amountpaidcents", "long", "amountpaidcents", "long"), ("_p_establishment", "string", "_p_establishment", "string"), ("type", "string", "type", "string"), ("ispaid", "boolean", "ispaid", "boolean"), ("for", "string", "for", "string"), ("_created_at", "string", "_created_at", "string"), ("_updated_at", "string", "_updated_at", "string"), ("formailingorder", "string", "formailingorder", "string"), ("formailinglistorder", "string", "formailinglistorder", "string"), ("_p_user", "string", "_p_user", "string"), ("`spcharge.created`", "long", "`spcharge.created`", "long"), ("`spcharge.data.object.billing_details.address.postal_code`", "long", "`spcharge.data.object.billing_details.address.postal_code`", "long"), ("`spcharge.data.object.payment_method_details.card.brand`", "string", "`spcharge.data.object.payment_method_details.card.brand`", "string"), ("`spcharge.data.object.payment_method_details.card.funding`", "string", "`spcharge.data.object.payment_method_details.card.funding`", "string")], transformation_ctx = "applymapping1") ## @type: ResolveChoice ## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"] ## @return: resolvechoice2 ## @inputs: [frame = applymapping1] resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2") ## @type: DropNullFields ## @args: [transformation_ctx = "dropnullfields3"] ## @return: dropnullfields3 ## @inputs: [frame = resolvechoice2] dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://wp-datalake-v1/zohoexport"}, format = "parquet", transformation_ctx = "datasink4"] ## @return: datasink4 ## @inputs: [frame = dropnullfields3] datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://wp-datalake-v1/zohoexport"}, format = "parquet", transformation_ctx = "datasink4") job.commit() ``` Does anyone know what this error means and how I can resolve?
3
answers
0
votes
354
views
asked 3 months ago

HIVE_PARTITION_SCHEMA_MISMATCH

Error: HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. The types are incompatible and cannot be coerced. The column 'isfallbackkey' in table 'analytics.events' is declared as type 'boolean', but partition 'dt=2022-08-22/appid=jovo-game-starter' declared column 'translationkey' as type 'string'. Data was passed as JSON in this format: ``` {"type":"device_capabilities","supportScreen":true,"supportAudio":true,"supportLongformAudio":false,"supportVideo":false,"eventId":"668c9479-9eee-4025-8b9a-1323db06b21f","appId":"jovo-game-starter","eventDate":"2022-08-22T17:26:36.376Z","timestamp":1661189196,"locale":"en","timeZone":"America/Phoenix","userId":"a8ad82ba-bfac-4f93-a46d-aae37e842a7b","sessionId":"139163c0-fcf2-4bcc-9ece-a8e6ab5c322e"} {"type":"session_start","userId":"a8ad82ba-bfac-4f93-a46d-aae37e842a7b","sessionId":"139163c0-fcf2-4bcc-9ece-a8e6ab5c322e","eventId":"411e3abf-07fc-453c-9edd-a0a84f29b75f","appId":"jovo-game-starter","eventDate":"2022-08-22T17:26:36.383Z","timestamp":1661189196,"locale":"en","timeZone":"America/Phoenix"} {"type":"intent","userId":"a8ad82ba-bfac-4f93-a46d-aae37e842a7b","sessionId":"139163c0-fcf2-4bcc-9ece-a8e6ab5c322e","intent":"LAUNCH","eventId":"09287f39-e487-474b-bafc-c0c1b9f59959","appId":"jovo-game-starter","eventDate":"2022-08-22T17:26:36.387Z","timestamp":1661189196,"locale":"en","timeZone":"America/Phoenix"} {"type":"translation","translationKey":"start","isFallbackKey":false,"translationLanguage":"en","translationPlatform":"core","eventId":"15b87be7-5349-4a9e-b950-76bd76b63972","appId":"jovo-game-starter","eventDate":"2022-08-22T17:26:37.889Z","timestamp":1661189198,"locale":"en","timeZone":"America/Phoenix","userId":"a8ad82ba-bfac-4f93-a46d-aae37e842a7b","sessionId":"139163c0-fcf2-4bcc-9ece-a8e6ab5c322e"} ``` Using Kinesis Firehose dynamic partitioning: events/dt=!{partitionKeyFromQuery:dt}/appid=!{partitionKeyFromQuery:appid}/
0
answers
0
votes
53
views
asked 3 months ago

What should be the correct Exclude Pattern and Table level when dealing with folders with different names?

Hello, I have a s3 bucket with this following path: "s3://a/b/c" Inside this 'c' folder I have one folder for each table. Then for each of these table folders I have a folder for each version. Each version is a database snapshot obtained on a weekly basis, which is run by a workflow. To clarify, the structure inside 'c' is like this: 1. products 1. /version_0 1. _temporary 1. 0_$folder$ 2. part-00000-c5... ...c000.snappy.parquet 2. /version_1 1. _temporary 1. 0_$folder$ 2. part-00000-c5... ...c000.snappy.parquet 2. locations 1. /version_0 1. _temporary 1. 0_$folder$ 2. part-00000-c5... ...c000.snappy.parquet 2. /version_1 1. _temporary 1. 0_$folder$ 2. part-00000-c5... ...c000.snappy.parquet I have created a crawler (Include Path is set to the same path mentioned above - "s3://a/b/c") with the intention of merging all the versions together into 1 table, for each table (products, locations). The schemas of the different partitions are always the same. The structure of the different partitions is also always the same. The _temporary folder is something automatically generated by the workflow. **What should be the actual correct Exclude path (to ignore everything in _temporary folder) and maybe set any Table Level in order for me to create only ONE table merging all versions together for each table (products, locations)?** In summary I should have 2 tables: 1. products (containing version_0 and version_1 rows) 2. locations (containing version_0 and version_1 rows) I really have no way of testing the exclude patterns. Is there any Sandbox where we can actually test the glob exclude patterns? I have found one online but it doesn't seem to be similar to what AWS is using. I have tried with these exclude patterns but none worked (it still created a table for each table & each version): 1. version*/_temporary** 2. /\*\*/version*/_temporary*\*
1
answers
0
votes
77
views
asked 3 months ago