AWS GLUE job changing from STANDARD to FLEX not working as expected

0

Hi, I'm having an issue with the new FLEX feature.

In our company, we are trying to save costs when running GLUE Jobs (we are validating our product fit). The same day that FLEX was released we tried it. Since then I'm not able to make it work. I thought that only checking that Flex checkbox would suffice but I think I'm doing something wrong. The jobs are now running as before (without that checkbox checked) and they are running 100% OK. Simply put, we are reading from an RDS SQL Server Table, doing basic ETL processes, and storing it in an S3 bucket in CSV format. Also, I don't think there's an issue with Job timeout since it is set to 60 minutes and the job takes bearly a couple of minutes to fail

The failed job status shows:

  • Glue version: 3.0
  • Start-up time: 16 seconds
  • Execution time: 6 minutes 25 seconds
  • Timeout: 45 minutes
  • Worker type: G.1X
  • Number of workers: 10
  • Execution class: FLEX
  • Max capacity: 10 DPUs

The success job status is the same but:

  • Execution class: STANDARD

In the job monitor we read:

An error occurred while calling o87.getDynamicFrame. Job 0 cancelled because SparkContext was shut down caused by threshold for executors failed after launch reached. Note: This run was executed with Flex execution. Check the logs if run failed due to executor termination.

in cloudwatch logs part of the output error:

An error occurred while calling o90.getDynamicFrame.\n: org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1130)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1128)\n\tat scala.collection.mutable.HashSet.foreach(HashSet.scala:79)\n\tat org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1128)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2703)\n\tat org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)\n\tat org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2603)\n\tat org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2111)\n\tat org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1419)\n\tat org.apache.spark.SparkContext.stop(SparkContext.scala:2111)\n\tat org.apache.spark.SparkContext.$anonfun$new$39(SparkContext.scala:681)\n\tat org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)\n\tat org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)\n\tat org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat scala.util.Try$.apply(Try.scala:213)\n\tat org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)\n\tat org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)\n\tat org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:477)\n\tat org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:430)\n\tat org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)\n\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3733)\n\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2762)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3724)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:3722)\n\tat org.apache.spark.sql.Dataset.head(Dataset.scala:2762)\n\tat org.apache.spark.sql.Dataset.take(Dataset.scala:2969)\n\tat com.amazonaws.services.glue.JDBCDataSource.getLastRow(DataSource.scala:1089)\n\tat com.amazonaws.services.glue.JDBCDataSource.getJdbcJobBookmark(DataSource.scala:929)\n\tat com.amazonaws.services.glue.JDBCDataSource.getDynamicFrame(DataSource.scala:953)\n\tat com.amazonaws.services.glue.DataSource.getDynamicFrame(DataSource.scala:99)\n\tat com.amazonaws.services.glue.DataSource.getDynamicFrame$(DataSource.scala:99)\n\tat com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:714)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:750)\n","Stack Trace":[{"Declaring Class":"get_return_value","Method Name":"format(target_id, \".\", name), value)","File Name":"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py","Line Number":328},{"Declaring Class":"deco","Method Name":"return f(*a, **kw)","File Name":"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py","Line Number":111},{"Declaring Class":"__call__","Method Name":"answer, self.gateway_client, self.target_id, self.name)","File Name":"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py","Line Number":1305},{"Declaring Class":"getFrame","Method Name":"jframe = self._jsource.getDynamicFrame()","File Name":"/opt/amazon/lib/python3.6/site-packages/awsglue/data_source.py","Line Number":36},{"Declaring Class":"create_dynamic_frame_from_catalog","Method Name":"return source.getFrame(**kwargs)","File Name":"/opt/amazon/lib/python3.6/site-packages/awsglue/context.py","Line Number":185},{"Declaring Class":"from_catalog","Method Name":"return self._glue_context.create_dynamic_frame_from_catalog(db, table_name, redshift_tmp_dir, transformation_ctx, push_down_predicate, additional_options, catalog_id, **kwargs)","File Name":"/opt/amazon/lib/python3.6/site-packages/awsgl

Any help would be much appreciated, Agustin.

2개 답변
0

Hello Agustin,

Have you retired the flex job and was it the same behaviour?

If it fails every time, I believe it would be better if you could raise a support ticket. That would give better visibility on the resources and the Support can check through on what could be causing the issue.

Based on the doc https://aws.amazon.com/blogs/big-data/introducing-aws-glue-flex-jobs-cost-savings-on-etl-workloads/, it is recommended that you use flex jobs for pre-prod, testing or not urgent tasks.

Even if retry doesn’t work, please raise a support ticket. That would give a better visibility on the resources and cause of the issue.

Thank you!

AWS
지원 엔지니어
답변함 2년 전
0

We require jobrun id to check internal logs for a specific job run. To further help with this issue, could you please create a support ticket with jobrun id of both successful and failed run.

AWS
답변함 2년 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠