AttributeError: 'DataFrame' object has no attribute '_get_object_id'

0

Using the Zeppilin notebook server, I have written the following script. The initialization is taken from the template created in glue, but the rest of it is custom. I'm getting the error:

AttributeError: 'DataFrame' object has no attribute '_get_object_id'

when I run the script. I'm pretty confident the error is occurring during this line:

datasink = glueContext.write_dynamic_frame.from_catalog(frame = source_dynamic_frame, database = target_database, table_name = target_table_name, transformation_ctx = "datasink")

but I can't decipher what it's trying to tell me. Can anyone please help me out or point me in the right direction? Thanks!

%pyspark
import sys
from pyspark.context import SparkContext
from pyspark.sql.functions import lit, current_timestamp
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, StructType, StructField, LongType
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

def dfZipWithIndex (df, offset=1, colName="RID"):
    '''
        Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
        and preserves a schema

        :param df: source dataframe
        :param offset: adjustment to zipWithIndex()'s index
        :param colName: name of the index column
    '''
    new_schema = StructType(
                    df.schema.fields +                            # previous schema
                    [StructField(colName,LongType(),True)]        # new added field in front
                )

    zipped_rdd = df.rdd.zipWithIndex()

    new_rdd = zipped_rdd.map(lambda (row,rowId): (list(row)+ [rowId +offset]))

    return spark.createDataFrame(new_rdd, new_schema)
    
# if len(sys.argv) == 0:
sys.argv.append('TEST_ETL_JOB')
print sys.argv

# args = getResolvedOptions(sys.argv, ['JOB_NAME'])
args = {}
args['JOB_NAME'] = 'TEST_ETL_JOB'
print args

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# etl job configuration
schema_mapping = [("name", "string", "NAME", "string"), ("carrier id", "bigint", "DESCRIPTION", "string")]
source_file_name = "foo.csv"
source_database = "test016"
source_table_name = "5b3f6af0-9f34-4a6f-9c7d-c531b513f459-25a17d34_1f16_4013_9035_38783be0ba9b"
target_database = "test016"
target_table_name = "udf_udf_evt"

# Create a DynamicFrame using the uploaded table
source_dynamic_frame = glueContext.create_dynamic_frame.from_catalog(database=source_database, table_name=source_table_name)

# the mapping from CDF to UDF should appear here
source_dynamic_frame = ApplyMapping.apply(frame=source_dynamic_frame, mappings=schema_mapping)

# convert from dynamic frame to data frame to append columns
source_data_frame = source_dynamic_frame.toDF()

source_data_frame = source_data_frame.withColumn('SOURCE_ID', lit(0))
source_data_frame = source_data_frame.withColumn('TYPE_ID', lit(0))
source_data_frame = source_data_frame.withColumn('DELIVERY_ID', lit(0))

source_data_frame = source_data_frame.withColumn('SOURCE', lit('XXX'))
source_data_frame = source_data_frame.withColumn('NOTES', lit('YYY'))

# inject the source filename into the data frame
source_data_frame = source_data_frame.withColumn('SOURCE_FILE', lit(source_file_name))

# inject the row number from the source file into the data frame
source_data_frame = dfZipWithIndex( source_data_frame, 1, 'SOURCE_ROW' )

# inject the current date and time into the data frame for created and last modified
source_data_frame = source_data_frame.withColumn('DATE_CREATED', current_timestamp() )
source_data_frame = source_data_frame.withColumn('DATE_LASTMODIFIED', current_timestamp() )

# convert the data frame into a dynamic frame
source_dynamic_frame = DynamicFrame(source_data_frame, glueContext)

datasink = glueContext.write_dynamic_frame.from_catalog(frame = source_dynamic_frame, database = target_database, table_name = target_table_name, transformation_ctx = "datasink")

#print out information about this data frame for testing and debug
#print "Count: ", source_dynamic_frame.count()
source_dynamic_frame.show()

job.commit()

Edited by: Kindle Customer on Oct 11, 2018 4:02 PM

gefragt vor 6 Jahren6824 Aufrufe
3 Antworten
0

Spark does lazy evaluation so , error occur when you call action (write). Could you try without dfZipWithIndex function ?

source_data_frame = dfZipWithIndex( source_data_frame, 1, 'SOURCE_ROW' )

Shivan
beantwortet vor 6 Jahren
0

Thanks, but unfortunately that didn't change the error code. Any other suggestions?
For what it's worth, when I uncomment the following line and comment out the write_dynamic_frame line, the table is show as expected without error.

source_dynamic_frame.show()

Error:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-444437833802934152.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-444437833802934152.py", line 355, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 55, in <module>
  File "/usr/share/aws/glue/etl/python/PyGlue.zip/awsglue/dynamicframe.py", line 591, in from_catalog
    return self._glue_context.write_dynamic_frame_from_catalog(frame, db, table_name, redshift_tmp_dir, transformation_ctx, additional_options)
  File "/usr/share/aws/glue/etl/python/PyGlue.zip/awsglue/context.py", line 247, in write_dynamic_frame_from_catalog
    return DataSink(j_sink, self).write(frame)
  File "/usr/share/aws/glue/etl/python/PyGlue.zip/awsglue/data_sink.py", line 32, in write
    return self.writeFrame(dynamic_frame_or_dfc, info)
  File "/usr/share/aws/glue/etl/python/PyGlue.zip/awsglue/data_sink.py", line 28, in writeFrame
    return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1124, in __call__
    args_command, temp_args = self._build_args(*args)
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1094, in _build_args
    [get_command_part(arg, self.pool) for arg in new_args])
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 289, in get_command_part
    command_part = REFERENCE_TYPE + parameter._get_object_id()
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 1020, in __getattr__
    "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
AttributeError: 'DataFrame' object has no attribute '_get_object_id'

Source Code:

%pyspark
import sys
from pyspark.context import SparkContext
from pyspark.sql.functions import lit, current_timestamp
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, StructType, StructField, LongType
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

def dfZipWithIndex (df, offset=1, colName="RID"):
    '''
        Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
        and preserves a schema

        :param df: source dataframe
        :param offset: adjustment to zipWithIndex()'s index
        :param colName: name of the index column
    '''
    new_schema = StructType(
                    df.schema.fields +                            # previous schema
                    [StructField(colName,LongType(),True)]        # new added field in front
                )

    zipped_rdd = df.rdd.zipWithIndex()

    new_rdd = zipped_rdd.map(lambda (row,rowId): (list(row)+ [rowId +offset]))

    return spark.createDataFrame(new_rdd, new_schema)
    
# if len(sys.argv) == 0:
sys.argv.append('TEST_ETL_JOB')
print sys.argv

# args = getResolvedOptions(sys.argv, ['JOB_NAME'])
args = {}
args['JOB_NAME'] = 'TEST_ETL_JOB'
print args

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# etl job configuration
schema_mapping = [("name", "string", "NAME", "string"), ("carrier id", "bigint", "DESCRIPTION", "string")]
source_file_name = "foo.csv"
source_database = "test016"
source_table_name = "5b3f6af0-9f34-4a6f-9c7d-c531b513f459-25a17d34_1f16_4013_9035_38783be0ba9b"
target_database = "test016"
target_table_name = "udf_udf_evt"

# Create a DynamicFrame using the uploaded table
source_dynamic_frame = glueContext.create_dynamic_frame.from_catalog(database=source_database, table_name=source_table_name)

# the mapping from CDF to UDF should appear here
source_dynamic_frame = ApplyMapping.apply(frame=source_dynamic_frame, mappings=schema_mapping)

# convert from dynamic frame to data frame to append columns
source_data_frame = source_dynamic_frame.toDF()

source_data_frame = source_data_frame.withColumn('SOURCE_ID', lit(0))
source_data_frame = source_data_frame.withColumn('TYPE_ID', lit(0))
source_data_frame = source_data_frame.withColumn('DELIVERY_ID', lit(0))

source_data_frame = source_data_frame.withColumn('SOURCE', lit('XXX'))
source_data_frame = source_data_frame.withColumn('NOTES', lit('YYY'))

# inject the source filename into the data frame
source_data_frame = source_data_frame.withColumn('SOURCE_FILE', lit(source_file_name))

# inject the row number from the source file into the data frame
# source_data_frame = dfZipWithIndex( source_data_frame, 1, 'SOURCE_ROW' )
source_data_frame = source_data_frame.withColumn('SOURCE_ROW', lit(0))

# inject the current date and time into the data frame for created and last modified
source_data_frame = source_data_frame.withColumn('DATE_CREATED', current_timestamp() )
source_data_frame = source_data_frame.withColumn('DATE_LASTMODIFIED', current_timestamp() )

# convert the data frame into a dynamic frame
source_dynamic_frame = DynamicFrame(source_data_frame, glueContext)

datasink = glueContext.write_dynamic_frame.from_catalog(frame = source_dynamic_frame, database = target_database, table_name = target_table_name, transformation_ctx = "datasink")

#print out information about this data frame for testing and debug
#print "Count: ", source_dynamic_frame.count()
source_dynamic_frame.show()

job.commit()

Edited by: Kindle Customer on Oct 12, 2018 10:34 AM

beantwortet vor 6 Jahren
0

I was able to track down the issue. This line doesn't work:

# convert the data frame into a dynamic frame
source_dynamic_frame = DynamicFrame(source_data_frame, glueContext)

It should be:

# convert the data frame into a dynamic frame
source_dynamic_frame = DynamicFrame.fromDF(source_data_frame, glueContext, "dynamic_frame")
beantwortet vor 6 Jahren

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen