1 réponse
- Le plus récent
- Le plus de votes
- La plupart des commentaires
0
【以下的回答经过翻译处理】 你需要使用create_trigger来添加工作流中的任务。
对于我的项目,我使用boto3创建了工作流,并添加了带参数的任务和爬虫。下面是代码片段供你参考。
我传递了一个包含任务名称和参数名称的字典。
{"file_name": "sales_divisions.csv", "landing_db": "sales_landing", "landing_s3_location": "s3://test-s3-landing-bucket/datalake/sales-landing", "landing_table_name": "sales_divisions", "landing_to_raw_job": "common_landing_to_raw", "raw_db": "sales_raw", "raw_s3_location": "s3://test-s3-raw-bucket/datalake/sales-raw", "raw_table_name": "sales_divisions", "raw_to_curated_job": "common_raw_to_curated", "curated_db": "sales_curated", "curated_s3_location": "s3://test-s3-curated-bucket/datalake/sales-curated", "curated_table_name": "sales_divisions"}
当我传递了这个字典后,以下代码将创建带有任务和爬虫的工作流:
#Create workflows listed in metadata
def create_workflows(metadata_dict,tag_list):
#Create workflow
for wrkflw_list in metadata_dict:
file = wrkflw_list["file_name"].replace('.','_')
time.sleep(0.4)
try:
glue.create_workflow(
Name = file,
Description = file + " Glue workflow - landing to raw to curated",
MaxConcurrentRuns = 1,
Tags=tag_list
)
except glue.exceptions.AlreadyExistsException:
print(file, "- Already exists")
except:
raise "error -- check the code"
#Create start trigger and add to workflow
# for wrkflw_list in metadata_dict:
try:
trigger_nm=file + "_start"
response=glue.get_trigger(Name=trigger_nm)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(trigger_nm,"- Already exists, Not creating")
time.sleep(0.2)
except glue.exceptions.EntityNotFoundException as err:
print(trigger_nm,"- does not exists. Creating trigger and within workflow")
glue.create_trigger(
Name=trigger_nm,
WorkflowName=file,
Type="ON_DEMAND",
Actions=[
{'CrawlerName': file + "_landing_crawler"}
],
Tags=tag_list
)
time.sleep(0.2)
try:
trigger_nm=file + "_landing_crawler_complete"
response=glue.get_trigger(Name=trigger_nm)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(trigger_nm,"- Already exists, Not creating")
time.sleep(0.2)
except glue.exceptions.EntityNotFoundException as err:
print(trigger_nm,"- does not exists. Creating trigger and within workflow")
response_add_raw_crawler = glue.create_trigger(
Name=trigger_nm,
WorkflowName=file,
Type='CONDITIONAL',
Predicate={
'Conditions': [
{
'LogicalOperator': 'EQUALS',
'CrawlerName': file + "_landing_crawler",
'CrawlState': 'SUCCEEDED'
},
]
},
Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
Tags=tag_list,
Actions=[
{
"JobName":wrkflw_list["landing_to_raw_job"],
"Arguments":
{"--run_type":"NORMAL",
"--file_name":wrkflw_list["file_name"],
"--config_path":"file_ingestion_conf_DEV.json"
}
}
]
)
time.sleep(0.2)
try:
trigger_nm=file + "_landing2raw_complete"
response=glue.get_trigger(Name=trigger_nm)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(trigger_nm,"- Already exists, Not creating")
time.sleep(0.2)
except glue.exceptions.EntityNotFoundException as err:
print(trigger_nm,"- does not exists. Creating trigger and within workflow")
response_add_raw_crawler = glue.create_trigger(
Name=trigger_nm,
WorkflowName=file,
Type='CONDITIONAL',
Predicate={
'Conditions': [
{
'LogicalOperator': 'EQUALS',
'JobName': wrkflw_list["landing_to_raw_job"],
'State': 'SUCCEEDED'
},
]
},
Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
Tags=tag_list,
Actions=[
{'CrawlerName': file + "_raw_crawler"}
]
)
time.sleep(0.2)
try:
trigger_nm=file + "_raw_crawler_complete"
response=glue.get_trigger(Name=trigger_nm)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(trigger_nm,"- Already exists, Not creating")
time.sleep(0.2)
except glue.exceptions.EntityNotFoundException as err:
print(trigger_nm,"- does not exists. Creating trigger and within workflow")
response_add_raw_crawler = glue.create_trigger(
Name=trigger_nm,
WorkflowName=file,
Type='CONDITIONAL',
Predicate={
'Conditions': [
{
'LogicalOperator': 'EQUALS',
'CrawlerName': file + "_raw_crawler",
'CrawlState': 'SUCCEEDED'
},
]
},
Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
Tags=tag_list,
Actions=[
{
"JobName":wrkflw_list["raw_to_curated_job"],
"Arguments":
{"--run_type":"NORMAL",
"--file_name":wrkflw_list["file_name"],
"--config_path":"file_ingestion_conf_DEV.json"
}
}
]
)
time.sleep(0.2)
try:
trigger_nm=file + "_raw2curated_complete"
response=glue.get_trigger(Name=trigger_nm)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(trigger_nm,"- Already exists, Not creating")
time.sleep(0.2)
except glue.exceptions.EntityNotFoundException as err:
print(trigger_nm,"- does not exists. Creating trigger and within workflow")
response_add_raw_crawler = glue.create_trigger(
Name=trigger_nm,
WorkflowName=file,
Type='CONDITIONAL',
Predicate={
'Conditions': [
{
'LogicalOperator': 'EQUALS',
"JobName":wrkflw_list["raw_to_curated_job"],
'State': 'SUCCEEDED'
},
]
},
Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
Tags=tag_list,
Actions=[
{
'CrawlerName': file + "_curated_crawler"
}
]
)
time.sleep(0.2)
Contenus pertinents
- demandé il y a 2 jours
- demandé il y a 2 ans
- demandé il y a 10 mois
- AWS OFFICIELA mis à jour il y a 2 ans
- AWS OFFICIELA mis à jour il y a 2 ans
- AWS OFFICIELA mis à jour il y a 3 ans
- AWS OFFICIELA mis à jour il y a 2 ans