1 Answer
- Newest
- Most votes
- Most comments
2
To answer your question, you need to use create_trigger to add the jobs to the workflow.
For my project, I have created workflows and added jobs and crawlers with parameters using boto3. I will share the snippets of the code below for your reference.
I passed a dict with names of job names and parameters.
{"file_name": "sales_divisions.csv", "landing_db": "sales_landing", "landing_s3_location": "s3://test-s3-landing-bucket/datalake/sales-landing", "landing_table_name": "sales_divisions", "landing_to_raw_job": "common_landing_to_raw", "raw_db": "sales_raw", "raw_s3_location": "s3://test-s3-raw-bucket/datalake/sales-raw", "raw_table_name": "sales_divisions", "raw_to_curated_job": "common_raw_to_curated", "curated_db": "sales_curated", "curated_s3_location": "s3://test-s3-curated-bucket/datalake/sales-curated", "curated_table_name": "sales_divisions"}
When I passed this dict, the below code will create workflow with jobs and crawlers:
#Create workflows listed in metadata
def create_workflows(metadata_dict,tag_list):
#Create workflow
for wrkflw_list in metadata_dict:
file = wrkflw_list["file_name"].replace('.','_')
time.sleep(0.4)
try:
glue.create_workflow(
Name = file,
Description = file + " Glue workflow - landing to raw to curated",
MaxConcurrentRuns = 1,
Tags=tag_list
)
except glue.exceptions.AlreadyExistsException:
print(file, "- Already exists")
except:
raise "error -- check the code"
#Create start trigger and add to workflow
# for wrkflw_list in metadata_dict:
try:
trigger_nm=file + "_start"
response=glue.get_trigger(Name=trigger_nm)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(trigger_nm,"- Already exists, Not creating")
time.sleep(0.2)
except glue.exceptions.EntityNotFoundException as err:
print(trigger_nm,"- does not exists. Creating trigger and within workflow")
glue.create_trigger(
Name=trigger_nm,
WorkflowName=file,
Type="ON_DEMAND",
Actions=[
{'CrawlerName': file + "_landing_crawler"}
],
Tags=tag_list
)
time.sleep(0.2)
try:
trigger_nm=file + "_landing_crawler_complete"
response=glue.get_trigger(Name=trigger_nm)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(trigger_nm,"- Already exists, Not creating")
time.sleep(0.2)
except glue.exceptions.EntityNotFoundException as err:
print(trigger_nm,"- does not exists. Creating trigger and within workflow")
response_add_raw_crawler = glue.create_trigger(
Name=trigger_nm,
WorkflowName=file,
Type='CONDITIONAL',
Predicate={
'Conditions': [
{
'LogicalOperator': 'EQUALS',
'CrawlerName': file + "_landing_crawler",
'CrawlState': 'SUCCEEDED'
},
]
},
Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
Tags=tag_list,
Actions=[
{
"JobName":wrkflw_list["landing_to_raw_job"],
"Arguments":
{"--run_type":"NORMAL",
"--file_name":wrkflw_list["file_name"],
"--config_path":"file_ingestion_conf_DEV.json"
}
}
]
)
time.sleep(0.2)
try:
trigger_nm=file + "_landing2raw_complete"
response=glue.get_trigger(Name=trigger_nm)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(trigger_nm,"- Already exists, Not creating")
time.sleep(0.2)
except glue.exceptions.EntityNotFoundException as err:
print(trigger_nm,"- does not exists. Creating trigger and within workflow")
response_add_raw_crawler = glue.create_trigger(
Name=trigger_nm,
WorkflowName=file,
Type='CONDITIONAL',
Predicate={
'Conditions': [
{
'LogicalOperator': 'EQUALS',
'JobName': wrkflw_list["landing_to_raw_job"],
'State': 'SUCCEEDED'
},
]
},
Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
Tags=tag_list,
Actions=[
{'CrawlerName': file + "_raw_crawler"}
]
)
time.sleep(0.2)
try:
trigger_nm=file + "_raw_crawler_complete"
response=glue.get_trigger(Name=trigger_nm)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(trigger_nm,"- Already exists, Not creating")
time.sleep(0.2)
except glue.exceptions.EntityNotFoundException as err:
print(trigger_nm,"- does not exists. Creating trigger and within workflow")
response_add_raw_crawler = glue.create_trigger(
Name=trigger_nm,
WorkflowName=file,
Type='CONDITIONAL',
Predicate={
'Conditions': [
{
'LogicalOperator': 'EQUALS',
'CrawlerName': file + "_raw_crawler",
'CrawlState': 'SUCCEEDED'
},
]
},
Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
Tags=tag_list,
Actions=[
{
"JobName":wrkflw_list["raw_to_curated_job"],
"Arguments":
{"--run_type":"NORMAL",
"--file_name":wrkflw_list["file_name"],
"--config_path":"file_ingestion_conf_DEV.json"
}
}
]
)
time.sleep(0.2)
try:
trigger_nm=file + "_raw2curated_complete"
response=glue.get_trigger(Name=trigger_nm)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(trigger_nm,"- Already exists, Not creating")
time.sleep(0.2)
except glue.exceptions.EntityNotFoundException as err:
print(trigger_nm,"- does not exists. Creating trigger and within workflow")
response_add_raw_crawler = glue.create_trigger(
Name=trigger_nm,
WorkflowName=file,
Type='CONDITIONAL',
Predicate={
'Conditions': [
{
'LogicalOperator': 'EQUALS',
"JobName":wrkflw_list["raw_to_curated_job"],
'State': 'SUCCEEDED'
},
]
},
Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
Tags=tag_list,
Actions=[
{
'CrawlerName': file + "_curated_crawler"
}
]
)
time.sleep(0.2)
answered 2 years ago
Relevant content
- Accepted Answerasked 4 months ago
- asked 8 months ago
- AWS OFFICIALUpdated 3 years ago
- AWS OFFICIALUpdated 3 years ago
- AWS OFFICIALUpdated 3 years ago
- AWS OFFICIALUpdated 2 years ago
Hi Ananth , Thanks alot for your time and posting me with code snippet and solution . This is really helpful
Hi Ananth , Can we add two triggers to same workflow of each trigger with different type. One with CONDITIONAL and Other trigger with SCHEDULED
https://docs.aws.amazon.com/glue/latest/dg/workflows_overview.html Only 1 start trigger is possible in each workflow. Each workflow has a start trigger. There are three types of start triggers: Schedule, One Demand or Eventbridge Event