如何用Boto3为Glue工作流添加触发器和作业?

0

【以下的问题经过翻译处理】 在Python编程中,我需要在Glue工作流中添加作业和触发器。我不确定如何做到这一点,有人能帮助吗?

我使用了boto3库可以创建工作流,启动和停止,但无法指定作业或触发器。

从下面的链接中可以看出,通过Event Bridge是可能的,但我没有IAM访问权限或创建任何策略或角色 https://docs.aws.amazon.com/glue/latest/dg/starting-workflow-eventbridge.html

从下面的链接中可以看到有一段代码片段,不确定在何处以及如何执行以及需要使用哪个服务。 https://repost.aws/questions/QU8oIj15VUTCS7d38ocHq7nQ/how-to-create-a-glue-workflow-programmatically

我需要一些工作步骤示例和代码片段。

profile picture
专家
已提问 10 个月前20 查看次数
1 回答
0

【以下的回答经过翻译处理】 你需要使用create_trigger来添加工作流中的任务。

对于我的项目,我使用boto3创建了工作流,并添加了带参数的任务和爬虫。下面是代码片段供你参考。

我传递了一个包含任务名称和参数名称的字典。

{"file_name": "sales_divisions.csv", "landing_db": "sales_landing", "landing_s3_location": "s3://test-s3-landing-bucket/datalake/sales-landing", "landing_table_name": "sales_divisions", "landing_to_raw_job": "common_landing_to_raw", "raw_db": "sales_raw", "raw_s3_location": "s3://test-s3-raw-bucket/datalake/sales-raw", "raw_table_name": "sales_divisions", "raw_to_curated_job": "common_raw_to_curated", "curated_db": "sales_curated", "curated_s3_location": "s3://test-s3-curated-bucket/datalake/sales-curated", "curated_table_name": "sales_divisions"}

当我传递了这个字典后,以下代码将创建带有任务和爬虫的工作流:

#Create workflows listed in metadata
def create_workflows(metadata_dict,tag_list):
       #Create workflow
       for wrkflw_list in metadata_dict:
           file = wrkflw_list["file_name"].replace('.','_')
           time.sleep(0.4)
           try:
               glue.create_workflow(
                   Name = file,
                   Description = file + " Glue workflow - landing to raw to curated",
                   MaxConcurrentRuns = 1,
                   Tags=tag_list
                   )
           except glue.exceptions.AlreadyExistsException:
               print(file, "- Already exists")
           except:
               raise "error -- check the code"
                           
       #Create start trigger and add to workflow
       # for wrkflw_list in metadata_dict:
           try:
               trigger_nm=file + "_start"
               response=glue.get_trigger(Name=trigger_nm)
               if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                   print(trigger_nm,"- Already exists, Not creating")
               time.sleep(0.2)
           except glue.exceptions.EntityNotFoundException as err:
               print(trigger_nm,"- does not exists. Creating trigger and within workflow")
               glue.create_trigger(
                   Name=trigger_nm,
                   WorkflowName=file,
                   Type="ON_DEMAND",
                   Actions=[
                       {'CrawlerName': file + "_landing_crawler"}
                       ],
                   Tags=tag_list
                   )
               time.sleep(0.2)
           try:
               trigger_nm=file + "_landing_crawler_complete"
               response=glue.get_trigger(Name=trigger_nm)
               if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                   print(trigger_nm,"- Already exists, Not creating")
               time.sleep(0.2)
           except glue.exceptions.EntityNotFoundException as err:
               print(trigger_nm,"- does not exists. Creating trigger and within workflow")
               response_add_raw_crawler = glue.create_trigger(
                   Name=trigger_nm,
                   WorkflowName=file,
                   Type='CONDITIONAL',
                   Predicate={
                       'Conditions': [
                               {
                                   'LogicalOperator': 'EQUALS',
                                   'CrawlerName': file + "_landing_crawler",
                                   'CrawlState': 'SUCCEEDED'
                               },
                           ]
                       },
                   Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                   Tags=tag_list,
                   Actions=[
                           {
                           "JobName":wrkflw_list["landing_to_raw_job"],
                           "Arguments": 
                                   {"--run_type":"NORMAL",
                                   "--file_name":wrkflw_list["file_name"],
                                   "--config_path":"file_ingestion_conf_DEV.json"
                                   }
                           }
                       ]
                   )
               time.sleep(0.2)
           try:
               trigger_nm=file + "_landing2raw_complete"
               response=glue.get_trigger(Name=trigger_nm)
               if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                   print(trigger_nm,"- Already exists, Not creating")
               time.sleep(0.2)
           except glue.exceptions.EntityNotFoundException as err:
               print(trigger_nm,"- does not exists. Creating trigger and within workflow")
               response_add_raw_crawler = glue.create_trigger(
                   Name=trigger_nm,
                   WorkflowName=file,
                   Type='CONDITIONAL',
                   Predicate={
                       'Conditions': [
                               {
                                   'LogicalOperator': 'EQUALS',
                                   'JobName': wrkflw_list["landing_to_raw_job"],
                                   'State': 'SUCCEEDED'
                               },
                           ]
                       },
                   Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                   Tags=tag_list,
                   Actions=[
                           {'CrawlerName': file + "_raw_crawler"}
                       ]
                   )
               time.sleep(0.2)
           try:
               trigger_nm=file + "_raw_crawler_complete"
               response=glue.get_trigger(Name=trigger_nm)
               if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                   print(trigger_nm,"- Already exists, Not creating")
               time.sleep(0.2)
           except glue.exceptions.EntityNotFoundException as err:
               print(trigger_nm,"- does not exists. Creating trigger and within workflow")
               response_add_raw_crawler = glue.create_trigger(
                   Name=trigger_nm,
                   WorkflowName=file,
                   Type='CONDITIONAL',
                   Predicate={
                       'Conditions': [
                               {
                                   'LogicalOperator': 'EQUALS',
                                   'CrawlerName': file + "_raw_crawler",
                                   'CrawlState': 'SUCCEEDED'
                               },
                           ]
                       },
                   Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                   Tags=tag_list,
                   Actions=[
                           {
                           "JobName":wrkflw_list["raw_to_curated_job"],
                           "Arguments": 
                                   {"--run_type":"NORMAL",
                                   "--file_name":wrkflw_list["file_name"],
                                   "--config_path":"file_ingestion_conf_DEV.json"
                                   }
                           }
                       ]
                   )
               time.sleep(0.2)
           try:
               trigger_nm=file + "_raw2curated_complete"
               response=glue.get_trigger(Name=trigger_nm)
               if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                   print(trigger_nm,"- Already exists, Not creating")
               time.sleep(0.2)
           except glue.exceptions.EntityNotFoundException as err:
               print(trigger_nm,"- does not exists. Creating trigger and within workflow")
               response_add_raw_crawler = glue.create_trigger(
                   Name=trigger_nm,
                   WorkflowName=file,
                   Type='CONDITIONAL',
                   Predicate={
                       'Conditions': [
                               {
                                   'LogicalOperator': 'EQUALS',
                                   "JobName":wrkflw_list["raw_to_curated_job"],
                                   'State': 'SUCCEEDED'
                               },
                           ]
                       },
                   Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                   Tags=tag_list,
                   Actions=[
                           {
                           'CrawlerName': file + "_curated_crawler"
                           }
                       ]
                   )
               time.sleep(0.2)
profile picture
专家
已回答 10 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则