Boto3 Adding Triggers and Jobs to Glue workflow

0

In Python programmatic way I need to add jobs and triggers to glue workflow. I am not sure how to do it can anyone help.

I have tried with boto3 library I can create workflow , start and stop but unable to assign jobs or triggers .

From below link I have seen it is possible via event bridge but I don't have IAM Access or to create any policies or roles https://docs.aws.amazon.com/glue/latest/dg/starting-workflow-eventbridge.html

From below link I have seen there is a code snippet not sure where and how to execute and what service need to be used https://repost.aws/questions/QU8oIj15VUTCS7d38ocHq7nQ/how-to-create-a-glue-workflow-programmatically

I need some working step wise example with code snippet

asked 2 years ago471 views
1 Answer
2
Accepted Answer

To answer your question, you need to use create_trigger to add the jobs to the workflow.

For my project, I have created workflows and added jobs and crawlers with parameters using boto3. I will share the snippets of the code below for your reference.

I passed a dict with names of job names and parameters.

{"file_name": "sales_divisions.csv", "landing_db": "sales_landing", "landing_s3_location": "s3://test-s3-landing-bucket/datalake/sales-landing", "landing_table_name": "sales_divisions", "landing_to_raw_job": "common_landing_to_raw", "raw_db": "sales_raw", "raw_s3_location": "s3://test-s3-raw-bucket/datalake/sales-raw", "raw_table_name": "sales_divisions", "raw_to_curated_job": "common_raw_to_curated", "curated_db": "sales_curated", "curated_s3_location": "s3://test-s3-curated-bucket/datalake/sales-curated", "curated_table_name": "sales_divisions"}

When I passed this dict, the below code will create workflow with jobs and crawlers:


#Create workflows listed in metadata
def create_workflows(metadata_dict,tag_list):
        #Create workflow
        for wrkflw_list in metadata_dict:
            file = wrkflw_list["file_name"].replace('.','_')
            time.sleep(0.4)
            try:
                glue.create_workflow(
                    Name = file,
                    Description = file + " Glue workflow - landing to raw to curated",
                    MaxConcurrentRuns = 1,
                    Tags=tag_list
                    )
            except glue.exceptions.AlreadyExistsException:
                print(file, "- Already exists")
            except:
                raise "error -- check the code"
                            
        #Create start trigger and add to workflow
        # for wrkflw_list in metadata_dict:
            try:
                trigger_nm=file + "_start"
                response=glue.get_trigger(Name=trigger_nm)
                if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                    print(trigger_nm,"- Already exists, Not creating")
                time.sleep(0.2)
            except glue.exceptions.EntityNotFoundException as err:
                print(trigger_nm,"- does not exists. Creating trigger and within workflow")
                glue.create_trigger(
                    Name=trigger_nm,
                    WorkflowName=file,
                    Type="ON_DEMAND",
                    Actions=[
                        {'CrawlerName': file + "_landing_crawler"}
                        ],
                    Tags=tag_list
                    )
                time.sleep(0.2)
            try:
                trigger_nm=file + "_landing_crawler_complete"
                response=glue.get_trigger(Name=trigger_nm)
                if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                    print(trigger_nm,"- Already exists, Not creating")
                time.sleep(0.2)
            except glue.exceptions.EntityNotFoundException as err:
                print(trigger_nm,"- does not exists. Creating trigger and within workflow")
                response_add_raw_crawler = glue.create_trigger(
                    Name=trigger_nm,
                    WorkflowName=file,
                    Type='CONDITIONAL',
                    Predicate={
                        'Conditions': [
                                {
                                    'LogicalOperator': 'EQUALS',
                                    'CrawlerName': file + "_landing_crawler",
                                    'CrawlState': 'SUCCEEDED'
                                },
                            ]
                        },
                    Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                    Tags=tag_list,
                    Actions=[
                            {
                            "JobName":wrkflw_list["landing_to_raw_job"],
                            "Arguments": 
                                    {"--run_type":"NORMAL",
                                    "--file_name":wrkflw_list["file_name"],
                                    "--config_path":"file_ingestion_conf_DEV.json"
                                    }
                            }
                        ]
                    )
                time.sleep(0.2)
            try:
                trigger_nm=file + "_landing2raw_complete"
                response=glue.get_trigger(Name=trigger_nm)
                if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                    print(trigger_nm,"- Already exists, Not creating")
                time.sleep(0.2)
            except glue.exceptions.EntityNotFoundException as err:
                print(trigger_nm,"- does not exists. Creating trigger and within workflow")
                response_add_raw_crawler = glue.create_trigger(
                    Name=trigger_nm,
                    WorkflowName=file,
                    Type='CONDITIONAL',
                    Predicate={
                        'Conditions': [
                                {
                                    'LogicalOperator': 'EQUALS',
                                    'JobName': wrkflw_list["landing_to_raw_job"],
                                    'State': 'SUCCEEDED'
                                },
                            ]
                        },
                    Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                    Tags=tag_list,
                    Actions=[
                            {'CrawlerName': file + "_raw_crawler"}
                        ]
                    )
                time.sleep(0.2)
            try:
                trigger_nm=file + "_raw_crawler_complete"
                response=glue.get_trigger(Name=trigger_nm)
                if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                    print(trigger_nm,"- Already exists, Not creating")
                time.sleep(0.2)
            except glue.exceptions.EntityNotFoundException as err:
                print(trigger_nm,"- does not exists. Creating trigger and within workflow")
                response_add_raw_crawler = glue.create_trigger(
                    Name=trigger_nm,
                    WorkflowName=file,
                    Type='CONDITIONAL',
                    Predicate={
                        'Conditions': [
                                {
                                    'LogicalOperator': 'EQUALS',
                                    'CrawlerName': file + "_raw_crawler",
                                    'CrawlState': 'SUCCEEDED'
                                },
                            ]
                        },
                    Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                    Tags=tag_list,
                    Actions=[
                            {
                            "JobName":wrkflw_list["raw_to_curated_job"],
                            "Arguments": 
                                    {"--run_type":"NORMAL",
                                    "--file_name":wrkflw_list["file_name"],
                                    "--config_path":"file_ingestion_conf_DEV.json"
                                    }
                            }
                        ]
                    )
                time.sleep(0.2)
            try:
                trigger_nm=file + "_raw2curated_complete"
                response=glue.get_trigger(Name=trigger_nm)
                if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                    print(trigger_nm,"- Already exists, Not creating")
                time.sleep(0.2)
            except glue.exceptions.EntityNotFoundException as err:
                print(trigger_nm,"- does not exists. Creating trigger and within workflow")
                response_add_raw_crawler = glue.create_trigger(
                    Name=trigger_nm,
                    WorkflowName=file,
                    Type='CONDITIONAL',
                    Predicate={
                        'Conditions': [
                                {
                                    'LogicalOperator': 'EQUALS',
                                    "JobName":wrkflw_list["raw_to_curated_job"],
                                    'State': 'SUCCEEDED'
                                },
                            ]
                        },
                    Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                    Tags=tag_list,
                    Actions=[
                            {
                            'CrawlerName': file + "_curated_crawler"
                            }
                        ]
                    )
                time.sleep(0.2)
profile pictureAWS
answered 2 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions