3 Answers
- Newest
- Most votes
- Most comments
0
Spark does lazy evaluation so , error occur when you call action (write). Could you try without dfZipWithIndex function ?
source_data_frame = dfZipWithIndex( source_data_frame, 1, 'SOURCE_ROW' )
answered 6 years ago
0
Thanks, but unfortunately that didn't change the error code. Any other suggestions?
For what it's worth, when I uncomment the following line and comment out the write_dynamic_frame line, the table is show as expected without error.
source_dynamic_frame.show()
Error:
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-444437833802934152.py", line 367, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-444437833802934152.py", line 355, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 55, in <module>
File "/usr/share/aws/glue/etl/python/PyGlue.zip/awsglue/dynamicframe.py", line 591, in from_catalog
return self._glue_context.write_dynamic_frame_from_catalog(frame, db, table_name, redshift_tmp_dir, transformation_ctx, additional_options)
File "/usr/share/aws/glue/etl/python/PyGlue.zip/awsglue/context.py", line 247, in write_dynamic_frame_from_catalog
return DataSink(j_sink, self).write(frame)
File "/usr/share/aws/glue/etl/python/PyGlue.zip/awsglue/data_sink.py", line 32, in write
return self.writeFrame(dynamic_frame_or_dfc, info)
File "/usr/share/aws/glue/etl/python/PyGlue.zip/awsglue/data_sink.py", line 28, in writeFrame
return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1124, in __call__
args_command, temp_args = self._build_args(*args)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1094, in _build_args
[get_command_part(arg, self.pool) for arg in new_args])
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 289, in get_command_part
command_part = REFERENCE_TYPE + parameter._get_object_id()
File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 1020, in __getattr__
"'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
AttributeError: 'DataFrame' object has no attribute '_get_object_id'
Source Code:
%pyspark
import sys
from pyspark.context import SparkContext
from pyspark.sql.functions import lit, current_timestamp
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, StructType, StructField, LongType
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
def dfZipWithIndex (df, offset=1, colName="RID"):
'''
Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe
and preserves a schema
:param df: source dataframe
:param offset: adjustment to zipWithIndex()'s index
:param colName: name of the index column
'''
new_schema = StructType(
df.schema.fields + # previous schema
[StructField(colName,LongType(),True)] # new added field in front
)
zipped_rdd = df.rdd.zipWithIndex()
new_rdd = zipped_rdd.map(lambda (row,rowId): (list(row)+ [rowId +offset]))
return spark.createDataFrame(new_rdd, new_schema)
# if len(sys.argv) == 0:
sys.argv.append('TEST_ETL_JOB')
print sys.argv
# args = getResolvedOptions(sys.argv, ['JOB_NAME'])
args = {}
args['JOB_NAME'] = 'TEST_ETL_JOB'
print args
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# etl job configuration
schema_mapping = [("name", "string", "NAME", "string"), ("carrier id", "bigint", "DESCRIPTION", "string")]
source_file_name = "foo.csv"
source_database = "test016"
source_table_name = "5b3f6af0-9f34-4a6f-9c7d-c531b513f459-25a17d34_1f16_4013_9035_38783be0ba9b"
target_database = "test016"
target_table_name = "udf_udf_evt"
# Create a DynamicFrame using the uploaded table
source_dynamic_frame = glueContext.create_dynamic_frame.from_catalog(database=source_database, table_name=source_table_name)
# the mapping from CDF to UDF should appear here
source_dynamic_frame = ApplyMapping.apply(frame=source_dynamic_frame, mappings=schema_mapping)
# convert from dynamic frame to data frame to append columns
source_data_frame = source_dynamic_frame.toDF()
source_data_frame = source_data_frame.withColumn('SOURCE_ID', lit(0))
source_data_frame = source_data_frame.withColumn('TYPE_ID', lit(0))
source_data_frame = source_data_frame.withColumn('DELIVERY_ID', lit(0))
source_data_frame = source_data_frame.withColumn('SOURCE', lit('XXX'))
source_data_frame = source_data_frame.withColumn('NOTES', lit('YYY'))
# inject the source filename into the data frame
source_data_frame = source_data_frame.withColumn('SOURCE_FILE', lit(source_file_name))
# inject the row number from the source file into the data frame
# source_data_frame = dfZipWithIndex( source_data_frame, 1, 'SOURCE_ROW' )
source_data_frame = source_data_frame.withColumn('SOURCE_ROW', lit(0))
# inject the current date and time into the data frame for created and last modified
source_data_frame = source_data_frame.withColumn('DATE_CREATED', current_timestamp() )
source_data_frame = source_data_frame.withColumn('DATE_LASTMODIFIED', current_timestamp() )
# convert the data frame into a dynamic frame
source_dynamic_frame = DynamicFrame(source_data_frame, glueContext)
datasink = glueContext.write_dynamic_frame.from_catalog(frame = source_dynamic_frame, database = target_database, table_name = target_table_name, transformation_ctx = "datasink")
#print out information about this data frame for testing and debug
#print "Count: ", source_dynamic_frame.count()
source_dynamic_frame.show()
job.commit()
Edited by: Kindle Customer on Oct 12, 2018 10:34 AM
answered 6 years ago
0
I was able to track down the issue. This line doesn't work:
# convert the data frame into a dynamic frame
source_dynamic_frame = DynamicFrame(source_data_frame, glueContext)
It should be:
# convert the data frame into a dynamic frame
source_dynamic_frame = DynamicFrame.fromDF(source_data_frame, glueContext, "dynamic_frame")
answered 6 years ago
Relevant content
- asked 4 years ago
- AWS OFFICIALUpdated 5 months ago
- AWS OFFICIALUpdated 5 months ago
- AWS OFFICIALUpdated 4 months ago
- AWS OFFICIALUpdated 7 months ago