Skip to content

AWS Glue Job: Error calling o92.pyWriteDynamicFrame

0

I would like to resolve an error for job run id: jr_d67e4c6d24e9feb7e071bc5a2989be9acdce32bbea82596b8f953e96808c3d5e

The error:

Traceback (most recent call last): File "/tmp/ga-overview-gluejob", line 52, in <module> datasink1 = glueContext.write_dynamic_frame.from_catalog(frame = transform1, name_space = "test-hud", table_name = "ga_overview", transformation_ctx = "datasink1") File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 657, in from_catalog return self._glue_context.write_dynamic_frame_from_catalog(frame, db, table_name, redshift_tmp_dir, transformation_ctx, additional_options, catalog_id) File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 298, in write_dynamic_frame_from_catalog return DataSink(j_sink, self).write(frame) File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write return self.writeFrame(dynamic_frame_or_dfc, info) File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors") File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(**a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value)py4j.protocol.Py4JJavaError: An error occurred while calling o92.pyWriteDynamicFrame.: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 38, 172.36.108.132, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main process() File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched for item in iterator: File "<string>", line 1, in <lambda> File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/worker.py", line 83, in <lambda> return lambda **a: toInternal(f(**a)) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper return f(**args, **kwargs) File "/tmp/ga-overview-gluejob", line 35, in <lambda>TypeError: strptime() argument 1 must be str, not int at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at com.amazonaws.services.glue.sinks.HadoopWriters$$anonfun$com$amazonaws$services$glue$sinks$HadoopWriters$$writeNotPartitioned$1.apply$mcV$sp(HadoopWriters.scala:121) at com.amazonaws.services.glue.sinks.HadoopWriters$$anonfun$com$amazonaws$services$glue$sinks$HadoopWriters$$writeNotPartitioned$1.apply(HadoopWriters.scala:121) at com.amazonaws.services.glue.sinks.HadoopWriters$$anonfun$com$amazonaws$services$glue$sinks$HadoopWriters$$writeNotPartitioned$1.apply(HadoopWriters.scala:121) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) at org.apache.spark.sql.glue.SparkUtility$.tryWithSafeFinallyAndFailureCallbacks(SparkUtility.scala:39) at com.amazonaws.services.glue.sinks.HadoopWriters$.com$amazonaws$services$glue$sinks$HadoopWriters$$writeNotPartitioned(HadoopWriters.scala:125) at com.amazonaws.services.glue.sinks.HadoopWriters$$anonfun$doStreamWrite$1.apply(HadoopWriters.scala:138) at com.amazonaws.services.glue.sinks.HadoopWriters$$anonfun$doStreamWrite$1.apply(HadoopWriters.scala:129) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114) at com.amazonaws.services.glue.sinks.HadoopWriters$.doStreamWrite(HadoopWriters.scala:129) at com.amazonaws.services.glue.sinks.HadoopDataSink$$anonfun$writeDynamicFrame$1.apply(HadoopDataSink.scala:258) at com.amazonaws.services.glue.sinks.HadoopDataSink$$anonfun$writeDynamicFrame$1.apply(HadoopDataSink.scala:148) at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66) at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66) at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:58) at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:66) at com.amazonaws.services.glue.sinks.HadoopDataSink.writeDynamicFrame(HadoopDataSink.scala:147) at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:63) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main process() File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream for obj in iterator: File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched for item in iterator: File "<string>", line 1, in <lambda> File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/worker.py", line 83, in <lambda> return lambda **a: toInternal(f(**a)) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "/tmp/ga-overview-gluejob", line 35, in <lambda>TypeError: strptime() argument 1 must be str, not int at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at com.amazonaws.services.glue.sinks.HadoopWriters$$anonfun$com$amazonaws$services$glue$sinks$HadoopWriters$$writeNotPartitioned$1.apply$mcV$sp(HadoopWriters.scala:121) at com.amazonaws.services.glue.sinks.HadoopWriters$$anonfun$com$amazonaws$services$glue$sinks$HadoopWriters$$writeNotPartitioned$1.apply(HadoopWriters.scala:121) at com.amazonaws.services.glue.sinks.HadoopWriters$$anonfun$com$amazonaws$services$glue$sinks$HadoopWriters$$writeNotPartitioned$1.apply(HadoopWriters.scala:121) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) at org.apache.spark.sql.glue.SparkUtility$.tryWithSafeFinallyAndFailureCallbacks(SparkUtility.scala:39) at com.amazonaws.services.glue.sinks.HadoopWriters$.com$amazonaws$services$glue$sinks$HadoopWriters$$writeNotPartitioned(HadoopWriters.scala:125) at com.amazonaws.services.glue.sinks.HadoopWriters$$anonfun$doStreamWrite$1.apply(HadoopWriters.scala:138) at com.amazonaws.services.glue.sinks.HadoopWriters$$anonfun$doStreamWrite$1.apply(HadoopWriters.scala:129) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more


The script:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import DateType
from datetime import datetime
from awsglue.job import Job

Get job name

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

Creat Glue and Spark Contexts

glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

EXTRACT data from redshift table

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "test-hud", table_name = "ga_overview")

Convert from Glue DynamicFrame to Spark Dataframe

df = datasource0.toDF()

TRANSFORM - page cleaning and date formatting

UDF to convert string to date

date_func = udf(lambda x: datetime.strptime(x, '%Y%m%d'), DateType())
df1 = df.withColumn('ga:date', date_format(date_func(col('ga:date')), 'yyyy-dd-MM'))

Trim URL parameters from page path columns, retain string dtype

url_cols_to_clean = ['ga:pagePath', 'ga:pagePathLevel1']
for url_col in url_cols_to_clean:
df2 = df1.withColumn(url_col, F.regexp_extract(url_col, r'([a-z0-9-._~%!$&()+,;=:@/])', 0))

Transforming Spark Dataframes back to Glue DynamicFrames

transform1 = DynamicFrame.fromDF(df2, glueContext, 'transform1')

LOAD

Storing the transformed data in same redshift table

datasink1 = glueContext.write_dynamic_frame.from_catalog(frame = transform1, name_space = "test-hud", table_name = "ga_overview", transformation_ctx = "datasink1")
job.commit()

asked 5 years ago6K views
1 Answer
0

Were you able to solve this error, apparently I'm having a similar error.

answered 4 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.