如何用Boto3为Glue工作流添加触发器和作业?

0

【以下的问题经过翻译处理】 在Python编程中,我需要在Glue工作流中添加作业和触发器。我不确定如何做到这一点,有人能帮助吗?

我使用了boto3库可以创建工作流,启动和停止,但无法指定作业或触发器。

从下面的链接中可以看出,通过Event Bridge是可能的,但我没有IAM访问权限或创建任何策略或角色 https://docs.aws.amazon.com/glue/latest/dg/starting-workflow-eventbridge.html

从下面的链接中可以看到有一段代码片段,不确定在何处以及如何执行以及需要使用哪个服务。 https://repost.aws/questions/QU8oIj15VUTCS7d38ocHq7nQ/how-to-create-a-glue-workflow-programmatically

我需要一些工作步骤示例和代码片段。

profile picture
EXPERTE
gefragt vor einem Jahr25 Aufrufe
1 Antwort
0

【以下的回答经过翻译处理】 你需要使用create_trigger来添加工作流中的任务。

对于我的项目,我使用boto3创建了工作流,并添加了带参数的任务和爬虫。下面是代码片段供你参考。

我传递了一个包含任务名称和参数名称的字典。

{"file_name": "sales_divisions.csv", "landing_db": "sales_landing", "landing_s3_location": "s3://test-s3-landing-bucket/datalake/sales-landing", "landing_table_name": "sales_divisions", "landing_to_raw_job": "common_landing_to_raw", "raw_db": "sales_raw", "raw_s3_location": "s3://test-s3-raw-bucket/datalake/sales-raw", "raw_table_name": "sales_divisions", "raw_to_curated_job": "common_raw_to_curated", "curated_db": "sales_curated", "curated_s3_location": "s3://test-s3-curated-bucket/datalake/sales-curated", "curated_table_name": "sales_divisions"}

当我传递了这个字典后,以下代码将创建带有任务和爬虫的工作流:

#Create workflows listed in metadata
def create_workflows(metadata_dict,tag_list):
       #Create workflow
       for wrkflw_list in metadata_dict:
           file = wrkflw_list["file_name"].replace('.','_')
           time.sleep(0.4)
           try:
               glue.create_workflow(
                   Name = file,
                   Description = file + " Glue workflow - landing to raw to curated",
                   MaxConcurrentRuns = 1,
                   Tags=tag_list
                   )
           except glue.exceptions.AlreadyExistsException:
               print(file, "- Already exists")
           except:
               raise "error -- check the code"
                           
       #Create start trigger and add to workflow
       # for wrkflw_list in metadata_dict:
           try:
               trigger_nm=file + "_start"
               response=glue.get_trigger(Name=trigger_nm)
               if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                   print(trigger_nm,"- Already exists, Not creating")
               time.sleep(0.2)
           except glue.exceptions.EntityNotFoundException as err:
               print(trigger_nm,"- does not exists. Creating trigger and within workflow")
               glue.create_trigger(
                   Name=trigger_nm,
                   WorkflowName=file,
                   Type="ON_DEMAND",
                   Actions=[
                       {'CrawlerName': file + "_landing_crawler"}
                       ],
                   Tags=tag_list
                   )
               time.sleep(0.2)
           try:
               trigger_nm=file + "_landing_crawler_complete"
               response=glue.get_trigger(Name=trigger_nm)
               if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                   print(trigger_nm,"- Already exists, Not creating")
               time.sleep(0.2)
           except glue.exceptions.EntityNotFoundException as err:
               print(trigger_nm,"- does not exists. Creating trigger and within workflow")
               response_add_raw_crawler = glue.create_trigger(
                   Name=trigger_nm,
                   WorkflowName=file,
                   Type='CONDITIONAL',
                   Predicate={
                       'Conditions': [
                               {
                                   'LogicalOperator': 'EQUALS',
                                   'CrawlerName': file + "_landing_crawler",
                                   'CrawlState': 'SUCCEEDED'
                               },
                           ]
                       },
                   Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                   Tags=tag_list,
                   Actions=[
                           {
                           "JobName":wrkflw_list["landing_to_raw_job"],
                           "Arguments": 
                                   {"--run_type":"NORMAL",
                                   "--file_name":wrkflw_list["file_name"],
                                   "--config_path":"file_ingestion_conf_DEV.json"
                                   }
                           }
                       ]
                   )
               time.sleep(0.2)
           try:
               trigger_nm=file + "_landing2raw_complete"
               response=glue.get_trigger(Name=trigger_nm)
               if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                   print(trigger_nm,"- Already exists, Not creating")
               time.sleep(0.2)
           except glue.exceptions.EntityNotFoundException as err:
               print(trigger_nm,"- does not exists. Creating trigger and within workflow")
               response_add_raw_crawler = glue.create_trigger(
                   Name=trigger_nm,
                   WorkflowName=file,
                   Type='CONDITIONAL',
                   Predicate={
                       'Conditions': [
                               {
                                   'LogicalOperator': 'EQUALS',
                                   'JobName': wrkflw_list["landing_to_raw_job"],
                                   'State': 'SUCCEEDED'
                               },
                           ]
                       },
                   Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                   Tags=tag_list,
                   Actions=[
                           {'CrawlerName': file + "_raw_crawler"}
                       ]
                   )
               time.sleep(0.2)
           try:
               trigger_nm=file + "_raw_crawler_complete"
               response=glue.get_trigger(Name=trigger_nm)
               if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                   print(trigger_nm,"- Already exists, Not creating")
               time.sleep(0.2)
           except glue.exceptions.EntityNotFoundException as err:
               print(trigger_nm,"- does not exists. Creating trigger and within workflow")
               response_add_raw_crawler = glue.create_trigger(
                   Name=trigger_nm,
                   WorkflowName=file,
                   Type='CONDITIONAL',
                   Predicate={
                       'Conditions': [
                               {
                                   'LogicalOperator': 'EQUALS',
                                   'CrawlerName': file + "_raw_crawler",
                                   'CrawlState': 'SUCCEEDED'
                               },
                           ]
                       },
                   Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                   Tags=tag_list,
                   Actions=[
                           {
                           "JobName":wrkflw_list["raw_to_curated_job"],
                           "Arguments": 
                                   {"--run_type":"NORMAL",
                                   "--file_name":wrkflw_list["file_name"],
                                   "--config_path":"file_ingestion_conf_DEV.json"
                                   }
                           }
                       ]
                   )
               time.sleep(0.2)
           try:
               trigger_nm=file + "_raw2curated_complete"
               response=glue.get_trigger(Name=trigger_nm)
               if response['ResponseMetadata']['HTTPStatusCode'] == 200:
                   print(trigger_nm,"- Already exists, Not creating")
               time.sleep(0.2)
           except glue.exceptions.EntityNotFoundException as err:
               print(trigger_nm,"- does not exists. Creating trigger and within workflow")
               response_add_raw_crawler = glue.create_trigger(
                   Name=trigger_nm,
                   WorkflowName=file,
                   Type='CONDITIONAL',
                   Predicate={
                       'Conditions': [
                               {
                                   'LogicalOperator': 'EQUALS',
                                   "JobName":wrkflw_list["raw_to_curated_job"],
                                   'State': 'SUCCEEDED'
                               },
                           ]
                       },
                   Description=file + '-FILE INGESTION JOB EVENT TRIGGER',
                   Tags=tag_list,
                   Actions=[
                           {
                           'CrawlerName': file + "_curated_crawler"
                           }
                       ]
                   )
               time.sleep(0.2)
profile picture
EXPERTE
beantwortet vor einem Jahr

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen