Questions tagged with Extract Transform & Load Data

Content language: English

Sort by most recent

Browse through the questions and answers listed below or filter and sort to narrow down your results.

AWS GLUE job changing from STANDARD to FLEX not working as expected

Hi, I'm having an issue with the new **FLEX** feature. In our company, we are trying to save costs when running GLUE Jobs (we are validating our product fit). The same day that FLEX was released we tried it. Since then I'm not able to make it work. I thought that only checking that Flex checkbox would suffice but I think I'm doing something wrong. The jobs are now running as before (without that checkbox checked) and they are running 100% OK. Simply put, we are reading from an RDS SQL Server Table, doing basic ETL processes, and storing it in an S3 bucket in CSV format. Also, I don't think there's an issue with Job timeout since it is set to 60 minutes and the job takes bearly a couple of minutes to fail The failed job status shows: * Glue version: 3.0 * Start-up time: 16 seconds * Execution time: 6 minutes 25 seconds * Timeout: 45 minutes * Worker type: G.1X * Number of workers: 10 * Execution class: FLEX * Max capacity: 10 DPUs The success job status is the same but: * Execution class: STANDARD In the job monitor we read: > An error occurred while calling o87.getDynamicFrame. Job 0 cancelled because SparkContext was shut down caused by threshold for executors failed after launch reached. Note: This run was executed with Flex execution. Check the logs if run failed due to executor termination. in cloudwatch logs part of the output error: `An error occurred while calling o90.getDynamicFrame.\n: org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1130)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1128)\n\tat scala.collection.mutable.HashSet.foreach(HashSet.scala:79)\n\tat org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1128)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2703)\n\tat org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)\n\tat org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2603)\n\tat org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2111)\n\tat org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1419)\n\tat org.apache.spark.SparkContext.stop(SparkContext.scala:2111)\n\tat org.apache.spark.SparkContext.$anonfun$new$39(SparkContext.scala:681)\n\tat org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)\n\tat org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)\n\tat org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat scala.util.Try$.apply(Try.scala:213)\n\tat org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)\n\tat org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)\n\tat org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:477)\n\tat org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:430)\n\tat org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)\n\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3733)\n\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2762)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3724)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:3722)\n\tat org.apache.spark.sql.Dataset.head(Dataset.scala:2762)\n\tat org.apache.spark.sql.Dataset.take(Dataset.scala:2969)\n\tat com.amazonaws.services.glue.JDBCDataSource.getLastRow(DataSource.scala:1089)\n\tat com.amazonaws.services.glue.JDBCDataSource.getJdbcJobBookmark(DataSource.scala:929)\n\tat com.amazonaws.services.glue.JDBCDataSource.getDynamicFrame(DataSource.scala:953)\n\tat com.amazonaws.services.glue.DataSource.getDynamicFrame(DataSource.scala:99)\n\tat com.amazonaws.services.glue.DataSource.getDynamicFrame$(DataSource.scala:99)\n\tat com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:714)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:750)\n","Stack Trace":[{"Declaring Class":"get_return_value","Method Name":"format(target_id, \".\", name), value)","File Name":"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py","Line Number":328},{"Declaring Class":"deco","Method Name":"return f(*a, **kw)","File Name":"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py","Line Number":111},{"Declaring Class":"__call__","Method Name":"answer, self.gateway_client, self.target_id, self.name)","File Name":"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py","Line Number":1305},{"Declaring Class":"getFrame","Method Name":"jframe = self._jsource.getDynamicFrame()","File Name":"/opt/amazon/lib/python3.6/site-packages/awsglue/data_source.py","Line Number":36},{"Declaring Class":"create_dynamic_frame_from_catalog","Method Name":"return source.getFrame(**kwargs)","File Name":"/opt/amazon/lib/python3.6/site-packages/awsglue/context.py","Line Number":185},{"Declaring Class":"from_catalog","Method Name":"return self._glue_context.create_dynamic_frame_from_catalog(db, table_name, redshift_tmp_dir, transformation_ctx, push_down_predicate, additional_options, catalog_id, **kwargs)","File Name":"/opt/amazon/lib/python3.6/site-packages/awsgl` Any help would be much appreciated, Agustin.
2
answers
0
votes
75
views
asked 3 months ago

An error occurred while calling o79.getDynamicFrame. java.lang.reflect.InvocationTargetException

I'm getting the error in the title while running a Glue Job to transform a CSV file to Parquet format. The code is below (unedited from the AWS recommendation): ``` import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "appdata", table_name = "payment_csv", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "appdata", table_name = "payment_csv", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("_id", "string", "_id", "string"), ("amountpaidcents", "long", "amountpaidcents", "long"), ("_p_establishment", "string", "_p_establishment", "string"), ("type", "string", "type", "string"), ("ispaid", "boolean", "ispaid", "boolean"), ("for", "string", "for", "string"), ("_created_at", "string", "_created_at", "string"), ("_updated_at", "string", "_updated_at", "string"), ("formailingorder", "string", "formailingorder", "string"), ("formailinglistorder", "string", "formailinglistorder", "string"), ("_p_user", "string", "_p_user", "string"), ("`spcharge.created`", "long", "`spcharge.created`", "long"), ("`spcharge.data.object.billing_details.address.postal_code`", "long", "`spcharge.data.object.billing_details.address.postal_code`", "long"), ("`spcharge.data.object.payment_method_details.card.brand`", "string", "`spcharge.data.object.payment_method_details.card.brand`", "string"), ("`spcharge.data.object.payment_method_details.card.funding`", "string", "`spcharge.data.object.payment_method_details.card.funding`", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("_id", "string", "_id", "string"), ("amountpaidcents", "long", "amountpaidcents", "long"), ("_p_establishment", "string", "_p_establishment", "string"), ("type", "string", "type", "string"), ("ispaid", "boolean", "ispaid", "boolean"), ("for", "string", "for", "string"), ("_created_at", "string", "_created_at", "string"), ("_updated_at", "string", "_updated_at", "string"), ("formailingorder", "string", "formailingorder", "string"), ("formailinglistorder", "string", "formailinglistorder", "string"), ("_p_user", "string", "_p_user", "string"), ("`spcharge.created`", "long", "`spcharge.created`", "long"), ("`spcharge.data.object.billing_details.address.postal_code`", "long", "`spcharge.data.object.billing_details.address.postal_code`", "long"), ("`spcharge.data.object.payment_method_details.card.brand`", "string", "`spcharge.data.object.payment_method_details.card.brand`", "string"), ("`spcharge.data.object.payment_method_details.card.funding`", "string", "`spcharge.data.object.payment_method_details.card.funding`", "string")], transformation_ctx = "applymapping1") ## @type: ResolveChoice ## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"] ## @return: resolvechoice2 ## @inputs: [frame = applymapping1] resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2") ## @type: DropNullFields ## @args: [transformation_ctx = "dropnullfields3"] ## @return: dropnullfields3 ## @inputs: [frame = resolvechoice2] dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://wp-datalake-v1/zohoexport"}, format = "parquet", transformation_ctx = "datasink4"] ## @return: datasink4 ## @inputs: [frame = dropnullfields3] datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://wp-datalake-v1/zohoexport"}, format = "parquet", transformation_ctx = "datasink4") job.commit() ``` Does anyone know what this error means and how I can resolve?
3
answers
0
votes
381
views
asked 3 months ago

HIVE_PARTITION_SCHEMA_MISMATCH

Error: HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. The types are incompatible and cannot be coerced. The column 'isfallbackkey' in table 'analytics.events' is declared as type 'boolean', but partition 'dt=2022-08-22/appid=jovo-game-starter' declared column 'translationkey' as type 'string'. Data was passed as JSON in this format: ``` {"type":"device_capabilities","supportScreen":true,"supportAudio":true,"supportLongformAudio":false,"supportVideo":false,"eventId":"668c9479-9eee-4025-8b9a-1323db06b21f","appId":"jovo-game-starter","eventDate":"2022-08-22T17:26:36.376Z","timestamp":1661189196,"locale":"en","timeZone":"America/Phoenix","userId":"a8ad82ba-bfac-4f93-a46d-aae37e842a7b","sessionId":"139163c0-fcf2-4bcc-9ece-a8e6ab5c322e"} {"type":"session_start","userId":"a8ad82ba-bfac-4f93-a46d-aae37e842a7b","sessionId":"139163c0-fcf2-4bcc-9ece-a8e6ab5c322e","eventId":"411e3abf-07fc-453c-9edd-a0a84f29b75f","appId":"jovo-game-starter","eventDate":"2022-08-22T17:26:36.383Z","timestamp":1661189196,"locale":"en","timeZone":"America/Phoenix"} {"type":"intent","userId":"a8ad82ba-bfac-4f93-a46d-aae37e842a7b","sessionId":"139163c0-fcf2-4bcc-9ece-a8e6ab5c322e","intent":"LAUNCH","eventId":"09287f39-e487-474b-bafc-c0c1b9f59959","appId":"jovo-game-starter","eventDate":"2022-08-22T17:26:36.387Z","timestamp":1661189196,"locale":"en","timeZone":"America/Phoenix"} {"type":"translation","translationKey":"start","isFallbackKey":false,"translationLanguage":"en","translationPlatform":"core","eventId":"15b87be7-5349-4a9e-b950-76bd76b63972","appId":"jovo-game-starter","eventDate":"2022-08-22T17:26:37.889Z","timestamp":1661189198,"locale":"en","timeZone":"America/Phoenix","userId":"a8ad82ba-bfac-4f93-a46d-aae37e842a7b","sessionId":"139163c0-fcf2-4bcc-9ece-a8e6ab5c322e"} ``` Using Kinesis Firehose dynamic partitioning: events/dt=!{partitionKeyFromQuery:dt}/appid=!{partitionKeyFromQuery:appid}/
0
answers
0
votes
54
views
asked 3 months ago