Hello, I have an etl job that converts json -> parquet and was working up until I enabled job bookmarking via cloudformation.
Now when my job runs I get the following error:
An error occurred while calling o122.relationalize. com.amazonaws.services.glue.util.HadoopDataSourceJobBookmarkState cannot be cast to com.amazonaws.services.glue.util.RelationalizeJobBookmarkState
cloudformation.yml
AnalyticsGlueJob:
Type: 'AWS::Glue::Job'
Properties:
Role: !Ref AnalyticsGlueRole
Command:
Name: 'glueetl'
ScriptLocation: !Sub 's3://${AnalyticsS3Bucket}/analytics_etl.py'
GlueVersion: '3.0'
DefaultArguments:
'--connection_type': 's3'
'--db_name': !Ref AnalyticsGlueDatabase
'--enable-metrics': ''
'--job-bookmark-option': 'job-bookmark-enable'
'--s3_dest': !Sub 's3://${AnalyticsS3ParquetBucket}/logs/'
'--table_name': 'logs'
'--temp_dir': !Sub 's3://${AnalyticsS3ParquetBucket}/temp/'
etl_job.py
sc = SparkContext()
gc = GlueContext(sc)
spark = gc.spark_session
job = Job(gc)
job.init(args['JOB_NAME'], args)
dyf = gc.create_dynamic_frame.from_catalog(
database=DATABASE,
table_name=TABLE_NAME,
transformation_ctx='bookmark_ctx',
)
# transform certain field types.
dyf = ApplyMapping.apply(
frame=dyf,
mappings=[
('authorizer.error', 'string', 'authorizer.error', 'string'),
('authorizer.latency', 'string', 'authorizer.latency', 'int'),
('authorizer.principal', 'string', 'authorizer.principal', 'string'),
('authorizer.requestId', 'string', 'authorizer.requestId', 'string'),
('authorizer.status', 'string', 'authorizer.status', 'int'),
('caller', 'string', 'caller', 'string'),
('httpmethod', 'string', 'httpmethod', 'string'),
('ip', 'string', 'ip', 'string'),
('partition_0', 'string', 'partition_0', 'string'),
('partition_1', 'string', 'partition_1', 'string'),
('partition_2', 'string', 'partition_2', 'string'),
('partition_3', 'string', 'partition_3', 'string'),
('path', 'string', 'path', 'string'),
('protocol', 'string', 'protocol', 'string'),
('requestid', 'string', 'requestid', 'string'),
('requesttime', 'string', 'requesttime', 'timestamp'),
('responselength', 'string', 'responselength', 'int'),
('status', 'string', 'status', 'int'),
('user', 'string', 'user', 'string'),
('useragent', 'string', 'useragent', 'string'),
],
transformation_ctx='applymapping_ctx',
)
# flatten nested json.
dyf = Relationalize.apply(
frame=dyf,
staging_path=TEMP_DIR,
name='root',
transformation_ctx='relationalize_ctx',
)
dyf = dyf.select('root')
# write partitioned parquet files.
dyf = gc.write_dynamic_frame.from_options(
frame=dyf,
connection_type=CONNECTION_TYPE,
connection_options={
'path': S3_DEST,
'partitionKeys': [
'partition_0',
'partition_1',
'partition_2',
'partition_3',
],
},
format='glueparquet',
transformation_ctx='parquet_ctx',
)
job.commit()
The error mentions Relationalize
which I'm using to flatten a nested json structure, but I'm not sure why it's failing?
Any help appreciated!
edit
I think I may have it working but I'm noticing that if the job runs without any new data in the source bucket, it will fail and throw an error rather than show as succeeded.
IllegalArgumentException: Partition column partition_0 not found in schema
When new records arrive in the source bucket and the job runs again, it is working and showing succeeded...
Still interested in feedback!