AttributeError: 'DataFrame' object has no attribute '_get_object_id'

0

Using the Zeppilin notebook server, I have written the following script. The initialization is taken from the template created in glue, but the rest of it is custom. I'm getting the error:

AttributeError: 'DataFrame' object has no attribute '_get_object_id'

when I run the script. I'm pretty confident the error is occurring during this line:

datasink = glueContext.write_dynamic_frame.from_catalog(frame = source_dynamic_frame, database = target_database, table_name = target_table_name, transformation_ctx = "datasink")

but I can't decipher what it's trying to tell me. Can anyone please help me out or point me in the right direction? Thanks!

%pyspark
import sys
from pyspark.context import SparkContext
from pyspark.sql.functions import lit, current_timestamp
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, StructType, StructField, LongType
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

def dfZipWithIndex (df, offset=1, colName="RID"):
    '''
        Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
        and preserves a schema

        :param df: source dataframe
        :param offset: adjustment to zipWithIndex()'s index
        :param colName: name of the index column
    '''
    new_schema = StructType(
                    df.schema.fields +                            # previous schema
                    [StructField(colName,LongType(),True)]        # new added field in front
                )

    zipped_rdd = df.rdd.zipWithIndex()

    new_rdd = zipped_rdd.map(lambda (row,rowId): (list(row)+ [rowId +offset]))

    return spark.createDataFrame(new_rdd, new_schema)
    
# if len(sys.argv) == 0:
sys.argv.append('TEST_ETL_JOB')
print sys.argv

# args = getResolvedOptions(sys.argv, ['JOB_NAME'])
args = {}
args['JOB_NAME'] = 'TEST_ETL_JOB'
print args

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# etl job configuration
schema_mapping = [("name", "string", "NAME", "string"), ("carrier id", "bigint", "DESCRIPTION", "string")]
source_file_name = "foo.csv"
source_database = "test016"
source_table_name = "5b3f6af0-9f34-4a6f-9c7d-c531b513f459-25a17d34_1f16_4013_9035_38783be0ba9b"
target_database = "test016"
target_table_name = "udf_udf_evt"

# Create a DynamicFrame using the uploaded table
source_dynamic_frame = glueContext.create_dynamic_frame.from_catalog(database=source_database, table_name=source_table_name)

# the mapping from CDF to UDF should appear here
source_dynamic_frame = ApplyMapping.apply(frame=source_dynamic_frame, mappings=schema_mapping)

# convert from dynamic frame to data frame to append columns
source_data_frame = source_dynamic_frame.toDF()

source_data_frame = source_data_frame.withColumn('SOURCE_ID', lit(0))
source_data_frame = source_data_frame.withColumn('TYPE_ID', lit(0))
source_data_frame = source_data_frame.withColumn('DELIVERY_ID', lit(0))

source_data_frame = source_data_frame.withColumn('SOURCE', lit('XXX'))
source_data_frame = source_data_frame.withColumn('NOTES', lit('YYY'))

# inject the source filename into the data frame
source_data_frame = source_data_frame.withColumn('SOURCE_FILE', lit(source_file_name))

# inject the row number from the source file into the data frame
source_data_frame = dfZipWithIndex( source_data_frame, 1, 'SOURCE_ROW' )

# inject the current date and time into the data frame for created and last modified
source_data_frame = source_data_frame.withColumn('DATE_CREATED', current_timestamp() )
source_data_frame = source_data_frame.withColumn('DATE_LASTMODIFIED', current_timestamp() )

# convert the data frame into a dynamic frame
source_dynamic_frame = DynamicFrame(source_data_frame, glueContext)

datasink = glueContext.write_dynamic_frame.from_catalog(frame = source_dynamic_frame, database = target_database, table_name = target_table_name, transformation_ctx = "datasink")

#print out information about this data frame for testing and debug
#print "Count: ", source_dynamic_frame.count()
source_dynamic_frame.show()

job.commit()

Edited by: Kindle Customer on Oct 11, 2018 4:02 PM

asked 5 years ago6401 views
3 Answers
0

Spark does lazy evaluation so , error occur when you call action (write). Could you try without dfZipWithIndex function ?

source_data_frame = dfZipWithIndex( source_data_frame, 1, 'SOURCE_ROW' )

Shivan
answered 5 years ago
0

Thanks, but unfortunately that didn't change the error code. Any other suggestions?
For what it's worth, when I uncomment the following line and comment out the write_dynamic_frame line, the table is show as expected without error.

source_dynamic_frame.show()

Error:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-444437833802934152.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-444437833802934152.py", line 355, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 55, in <module>
  File "/usr/share/aws/glue/etl/python/PyGlue.zip/awsglue/dynamicframe.py", line 591, in from_catalog
    return self._glue_context.write_dynamic_frame_from_catalog(frame, db, table_name, redshift_tmp_dir, transformation_ctx, additional_options)
  File "/usr/share/aws/glue/etl/python/PyGlue.zip/awsglue/context.py", line 247, in write_dynamic_frame_from_catalog
    return DataSink(j_sink, self).write(frame)
  File "/usr/share/aws/glue/etl/python/PyGlue.zip/awsglue/data_sink.py", line 32, in write
    return self.writeFrame(dynamic_frame_or_dfc, info)
  File "/usr/share/aws/glue/etl/python/PyGlue.zip/awsglue/data_sink.py", line 28, in writeFrame
    return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1124, in __call__
    args_command, temp_args = self._build_args(*args)
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1094, in _build_args
    [get_command_part(arg, self.pool) for arg in new_args])
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 289, in get_command_part
    command_part = REFERENCE_TYPE + parameter._get_object_id()
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 1020, in __getattr__
    "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
AttributeError: 'DataFrame' object has no attribute '_get_object_id'

Source Code:

%pyspark
import sys
from pyspark.context import SparkContext
from pyspark.sql.functions import lit, current_timestamp
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, StructType, StructField, LongType
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

def dfZipWithIndex (df, offset=1, colName="RID"):
    '''
        Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
        and preserves a schema

        :param df: source dataframe
        :param offset: adjustment to zipWithIndex()'s index
        :param colName: name of the index column
    '''
    new_schema = StructType(
                    df.schema.fields +                            # previous schema
                    [StructField(colName,LongType(),True)]        # new added field in front
                )

    zipped_rdd = df.rdd.zipWithIndex()

    new_rdd = zipped_rdd.map(lambda (row,rowId): (list(row)+ [rowId +offset]))

    return spark.createDataFrame(new_rdd, new_schema)
    
# if len(sys.argv) == 0:
sys.argv.append('TEST_ETL_JOB')
print sys.argv

# args = getResolvedOptions(sys.argv, ['JOB_NAME'])
args = {}
args['JOB_NAME'] = 'TEST_ETL_JOB'
print args

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# etl job configuration
schema_mapping = [("name", "string", "NAME", "string"), ("carrier id", "bigint", "DESCRIPTION", "string")]
source_file_name = "foo.csv"
source_database = "test016"
source_table_name = "5b3f6af0-9f34-4a6f-9c7d-c531b513f459-25a17d34_1f16_4013_9035_38783be0ba9b"
target_database = "test016"
target_table_name = "udf_udf_evt"

# Create a DynamicFrame using the uploaded table
source_dynamic_frame = glueContext.create_dynamic_frame.from_catalog(database=source_database, table_name=source_table_name)

# the mapping from CDF to UDF should appear here
source_dynamic_frame = ApplyMapping.apply(frame=source_dynamic_frame, mappings=schema_mapping)

# convert from dynamic frame to data frame to append columns
source_data_frame = source_dynamic_frame.toDF()

source_data_frame = source_data_frame.withColumn('SOURCE_ID', lit(0))
source_data_frame = source_data_frame.withColumn('TYPE_ID', lit(0))
source_data_frame = source_data_frame.withColumn('DELIVERY_ID', lit(0))

source_data_frame = source_data_frame.withColumn('SOURCE', lit('XXX'))
source_data_frame = source_data_frame.withColumn('NOTES', lit('YYY'))

# inject the source filename into the data frame
source_data_frame = source_data_frame.withColumn('SOURCE_FILE', lit(source_file_name))

# inject the row number from the source file into the data frame
# source_data_frame = dfZipWithIndex( source_data_frame, 1, 'SOURCE_ROW' )
source_data_frame = source_data_frame.withColumn('SOURCE_ROW', lit(0))

# inject the current date and time into the data frame for created and last modified
source_data_frame = source_data_frame.withColumn('DATE_CREATED', current_timestamp() )
source_data_frame = source_data_frame.withColumn('DATE_LASTMODIFIED', current_timestamp() )

# convert the data frame into a dynamic frame
source_dynamic_frame = DynamicFrame(source_data_frame, glueContext)

datasink = glueContext.write_dynamic_frame.from_catalog(frame = source_dynamic_frame, database = target_database, table_name = target_table_name, transformation_ctx = "datasink")

#print out information about this data frame for testing and debug
#print "Count: ", source_dynamic_frame.count()
source_dynamic_frame.show()

job.commit()

Edited by: Kindle Customer on Oct 12, 2018 10:34 AM

answered 5 years ago
0

I was able to track down the issue. This line doesn't work:

# convert the data frame into a dynamic frame
source_dynamic_frame = DynamicFrame(source_data_frame, glueContext)

It should be:

# convert the data frame into a dynamic frame
source_dynamic_frame = DynamicFrame.fromDF(source_data_frame, glueContext, "dynamic_frame")
answered 5 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions