By using AWS re:Post, you agree to the Terms of Use

AWS parameterized Glue Concurrent jobs using step functions with enabled bookmarks - throws Version mismatch exception

0

I have a parameterized glue job , that will be called in parallel (25 glue job) through step functions, when bookmark enabled , version mismatch exception is thrown, when disabled, it runs fine. . Below are the inputs to the step function { "JobName": "gluejobname3", "Arguments": { "--cdb": "catalogdbname", "--ctbl": "tablename", "--dest": "s3://thepath/raw/", "--extra-py-files": "s3://thepath/mytransformation.py.zip" } } When bookmarks are disabled, the step functions calls the parameterized glue job, and loads the data into the different s3 location. below is the glue job script

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job import mytransformation

args = getResolvedOptions(sys.argv, ["JOB_NAME","cdb","ctbl","dest"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext)

this was given so that while it was initialized each job would have their unique id

job.init(args["JOB_NAME"]+args["ctbl"], args)

#Custom parameters that will be dynamically passed through with the call to the parameterized glue job db = args["cdb"] tbl = args["ctbl"] dest = args["dest"]

Script generated for node MySQL table

#dynamically creating a variable name. so that transformation_ctx would be unique for each glue job globals()[f'MySQLtable_sourceDf{tbl}'] = glueContext.create_dynamic_frame.from_catalog( database=db, table_name=tbl, transformation_ctx = '"'+f'MySQLtable_sourceDf{tbl}'+'"' )

#passing the same transformation_ctx into the destination frame destination_Df = mytransformation.tansform(globals()[f'MySQLtable_sourceDf{tbl}'])

Script generated for node Amazon S3 # : "s3://anvil-houston-preprod"

#creating the dynamic unique transformation ctx name since the jobs will be run concurrently globals()[f'S3bucket_node{tbl}'] = glueContext.write_dynamic_frame.from_options( frame=destination_Df, connection_type="s3", format="glueparquet", connection_options={"path": dest, "partitionKeys": []}, format_options={"compression": "snappy"}, transformation_ctx='"'+f'S3bucket_node{tbl}'+'"' )

job.commit()

Above runs fine , while the execution is started through step functions ( 25 parallel parameterized glue job), the job runs fine, and loads to 25 diffferent locations. When bookmark is now enabled, the job fails with version mismatch . An error occurred while calling z:com.amazonaws.services.glue.util.Job.commit. Continuation update failed due to version mismatch. Expected version 1 but found version 2 (Service: AWSGlueJobExecutor; Status Code: 400; Error Code: VersionMismatchException;

Please help