Glue ETL Job not working with error: o122.relationalize. com.amazonaws.services.glue.util.HadoopDataSourceJobBookmarkState cannot be cast to com.amazonaws.services.glue.util.RelationalizeJobBookmarkSt

0

Hello, I have an etl job that converts json -> parquet and was working up until I enabled job bookmarking via cloudformation.

Now when my job runs I get the following error:

An error occurred while calling o122.relationalize. com.amazonaws.services.glue.util.HadoopDataSourceJobBookmarkState cannot be cast to com.amazonaws.services.glue.util.RelationalizeJobBookmarkState

cloudformation.yml

  AnalyticsGlueJob:
    Type: 'AWS::Glue::Job'
    Properties:
      Role: !Ref AnalyticsGlueRole
      Command:
        Name: 'glueetl'
        ScriptLocation: !Sub 's3://${AnalyticsS3Bucket}/analytics_etl.py'
      GlueVersion: '3.0'
      DefaultArguments:
        '--connection_type': 's3'
        '--db_name': !Ref AnalyticsGlueDatabase
        '--enable-metrics': ''
        '--job-bookmark-option': 'job-bookmark-enable'
        '--s3_dest': !Sub 's3://${AnalyticsS3ParquetBucket}/logs/'
        '--table_name': 'logs'
        '--temp_dir': !Sub 's3://${AnalyticsS3ParquetBucket}/temp/'

etl_job.py

sc = SparkContext()
gc = GlueContext(sc)
spark = gc.spark_session
job = Job(gc)
job.init(args['JOB_NAME'], args)

dyf = gc.create_dynamic_frame.from_catalog(
    database=DATABASE,
    table_name=TABLE_NAME,
    transformation_ctx='bookmark_ctx',
)

# transform certain field types.
dyf = ApplyMapping.apply(
    frame=dyf,
    mappings=[
        ('authorizer.error', 'string', 'authorizer.error', 'string'),
        ('authorizer.latency', 'string', 'authorizer.latency', 'int'),
        ('authorizer.principal', 'string', 'authorizer.principal', 'string'),
        ('authorizer.requestId', 'string', 'authorizer.requestId', 'string'),
        ('authorizer.status', 'string', 'authorizer.status', 'int'),
        ('caller', 'string', 'caller', 'string'),
        ('httpmethod', 'string', 'httpmethod', 'string'),
        ('ip', 'string', 'ip', 'string'),
        ('partition_0', 'string', 'partition_0', 'string'),
        ('partition_1', 'string', 'partition_1', 'string'),
        ('partition_2', 'string', 'partition_2', 'string'),
        ('partition_3', 'string', 'partition_3', 'string'),
        ('path', 'string', 'path', 'string'),
        ('protocol', 'string', 'protocol', 'string'),
        ('requestid', 'string', 'requestid', 'string'),
        ('requesttime', 'string', 'requesttime', 'timestamp'),
        ('responselength', 'string', 'responselength', 'int'),
        ('status', 'string', 'status', 'int'),
        ('user', 'string', 'user', 'string'),
        ('useragent', 'string', 'useragent', 'string'),
    ],
    transformation_ctx='applymapping_ctx',
)

# flatten nested json.
dyf = Relationalize.apply(
    frame=dyf,
    staging_path=TEMP_DIR,
    name='root',
    transformation_ctx='relationalize_ctx',
)
dyf = dyf.select('root')

# write partitioned parquet files.
dyf = gc.write_dynamic_frame.from_options(
    frame=dyf,
    connection_type=CONNECTION_TYPE,
    connection_options={
        'path': S3_DEST,
        'partitionKeys': [
            'partition_0',
            'partition_1',
            'partition_2',
            'partition_3',
        ],
    },
    format='glueparquet',
    transformation_ctx='parquet_ctx',
)

job.commit()

The error mentions Relationalize which I'm using to flatten a nested json structure, but I'm not sure why it's failing?

Any help appreciated!

edit

I think I may have it working but I'm noticing that if the job runs without any new data in the source bucket, it will fail and throw an error rather than show as succeeded.

IllegalArgumentException: Partition column partition_0 not found in schema

When new records arrive in the source bucket and the job runs again, it is working and showing succeeded...

Still interested in feedback!

borg
질문됨 2년 전874회 조회
1개 답변
0

Solved my own issue by wrapping the write call in a check for an empty schema, which happens with bookmarking enabled when the next scheduled run of the crawler contains no new data.

if dyf.count():
    dyf = gc.write_dynamic_frame.from_options(
borg
답변함 2년 전
AWS
전문가
검토됨 2년 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인