如何通过编程方式创建Glue工作流程?

0

【以下的问题经过翻译处理】 有没有办法以编程方式创建 Glue 工作流?

我查看了 CloudFormation示例,但我仅找到一个创建一个空的工作流的示例(只有工作流名称、描述和属性)。 https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-glue-workflow.html

我也尝试查看 API 文档(https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-workflow.html),虽然该文档中列出了Glue工作流中的所有数据类型和所有结构,但是创建API里依然只有创建空工作流的方法。

请问如何根据 Lake Formation 中的蓝图创建工作流? 是否只需将某种预组装的 JSON 文件配置到创建Glue 工作流的流程中就可以了呢?

是否已经支持类似操作?还是需要等待可定制的蓝图?

更新:

正如已接受答案的代码片段中提到的,关键问题是使用下面的方法:

AAWS::Glue::Trigger

来构建工作流的构造。

具体来说,您需要:

  1. 使用 AWS::Glue::Workflow 创建工作流
  2. 如果您还需要创建数据库和连接(使用AWS::Glue::Database,AWS::Glue::Connection)
  3. 使用 AWS::Glue::Crawler 或 AWS::Glue::Job 创建要添加到工作流中的任何 Crawler 和任何作业
  4. 创建类型为 ON-DEMAND 的第一个触发器 (AWS::Glue::Trigger),Actions = 您的工作流需要启动的第一个爬虫或作业,工作流名称使用在第一步中创建的工作流
  5. 创建任何其他类型为 CONDITIONAL 的触发器

下面是一个示例(创建一个在 S3 存储桶 (cloudtraillogs) 上启动 Crawler 的工作流,如果成功,则启动一个 python 脚本来更改表和分区架构以使其与 Athena 一起工作))。

希望对大家有帮助!

---

AWSTemplateFormatVersion: '2010-09-09'
Description: Creates cloudtrail crwaler and catalog for Athena and a job to transform to Parquet

Parameters: 
  CloudtrailS3: 
    Type: String
    Description: Enter the unique bucket name where the cloud trails log are stored

  CloudtrailS3Path: 
    Type: String
    Description: Enter the path/prefix that you want to crawl 

  CloudtrailDataLakeS3: 
    Type: String
    Description: Enter the unique bucket name for the data lake in which to store the logs in Parquet Format

Resources:
    
  CloudTrailGlueExecutionRole:
    Type: 'AWS::IAM::Role'
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - glue.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Path: /
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
  
  GluePolicy:
    Properties:
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Action:
          - s3:GetBucketLocation
          - s3:GetObject
          - s3:PutObject
          - s3:ListBucket
          Effect: Allow
          Resource:
          - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailS3] ]
          - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailS3, '/*'] ]
          - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailDataLakeS3] ]
          - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailDataLakeS3, '/*'] ]
        - Action:
          - s3:DeleteObject
          Effect: Allow
          Resource:

          - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailDataLakeS3] ]
          - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailDataLakeS3, '/*'] ]
      PolicyName: glue_cloudtrail_S3_policy
      Roles:
      - Ref: CloudTrailGlueExecutionRole
    Type: AWS::IAM::Policy

  GlueWorkflow:
    Type: AWS::Glue::Workflow
    Properties: 
      Description: Workflow to crawl the cloudtrail logs
      Name: cloudtrail_discovery_workflow

  GlueDatabaseCloudTrail:
    Type: AWS::Glue::Database
    Properties:
      # The database is created in the Data Catalog for your account
      CatalogId: !Ref AWS::AccountId   
      DatabaseInput:
        # The name of the database is defined in the Parameters section above
        Name: cloudtrail_db
        Description: Database to hold tables for NY Philarmonica data
        LocationUri: !Ref CloudtrailDataLakeS3
  
  GlueCrawlerCTSource:
    Type: AWS::Glue::Crawler
    Properties:
      Name: cloudtrail_source_crawler
      Role: !GetAtt CloudTrailGlueExecutionRole.Arn
      #Classifiers: none, use the default classifier
      Description: AWS Glue crawler to crawl cloudtrail logs
      Schedule: 
        ScheduleExpression: 'cron(0 9 * * ? *)'
      DatabaseName: !Ref GlueDatabaseCloudTrail
      Targets:
        S3Targets:
          - Path: !Sub
            - s3://${bucket}/${path}
            - {
              bucket: !Ref CloudtrailS3,
              path : !Ref CloudtrailS3Path
              }
            Exclusions: 
              - '*/CloudTrail-Digest/**'
              - '*/Config/**'
            
      #TablePrefix: ''
      SchemaChangePolicy:
        UpdateBehavior: "UPDATE_IN_DATABASE"
        DeleteBehavior: "LOG"
      Configuration: "{\"Version\":1.0,\"CrawlerOutput\":{\"Partitions\":{\"AddOrUpdateBehavior\":\"InheritFromTable\"},\"Tables\":{\"AddOrUpdateBehavior\":\"MergeNewColumns\"}}}"

  GlueJobConvertTable:
    Type: AWS::Glue::Job
    Properties:
      Name: ct_change_table_schema
      Role:
        Fn::GetAtt: [CloudTrailGlueExecutionRole, Arn]
      ExecutionProperty:
        MaxConcurrentRuns: 1
      GlueVersion: 1.0
      Command:
        Name: pythonshell
        PythonVersion: 3
        ScriptLocation: !Sub
          - s3://${bucket}/python/ct_change_table_schema.py
          - {bucket: !Ref CloudtrailDataLakeS3}
      DefaultArguments:
        '--TempDir': !Sub
          - s3://${bucket}/glue_tmp/
          - {bucket: !Ref CloudtrailDataLakeS3}
        "--job-bookmark-option" : "job-bookmark-disable"
        "--enable-metrics" : ""
    DependsOn:
      - CloudTrailGlueExecutionRole

  GlueSourceCrawlerTrigger:
    Type: AWS::Glue::Trigger
    Properties:
      Name: ct_start_source_crawl_Trigger
      Type: ON_DEMAND
      Description: Source Crawler trigger
      WorkflowName: !Ref GlueWorkflow
      Actions:
      - CrawlerName:
          Ref: GlueCrawlerCTSource
    DependsOn:
      - GlueCrawlerCTSource

  GlueJobTrigger:
    Type: AWS::Glue::Trigger
    Properties:
      Name: ct_change_schema_Job_Trigger
      Type: CONDITIONAL
      Description: Job trigger
      WorkflowName: !Ref GlueWorkflow
      StartOnCreation: 'true'
      Actions:
      - JobName: !Ref GlueJobConvertTable
      Predicate:
        Conditions:
        - LogicalOperator: EQUALS
          CrawlerName: !Ref GlueCrawlerCTSource
          CrawlState: SUCCEEDED
        Logical: ANY
    DependsOn:
    - GlueJobConvertTable

profile picture
专家
已提问 5 个月前20 查看次数
1 回答
0

【以下的回答经过翻译处理】 对于工作流程,您需要结合触发器、爬虫和作业来使用。 CloudFormation 可以覆盖大部分内容,但您可能仍然需要一些自定义资源和/或启动 Step Function 之类。

示例(来自互联网,[帖子](https://www.reddit.com/r/aws/comments/dl2f6p/aws_glue_workflow_cfn/)):

---
Parameters:
  OutputPathLocation:
    Description: Output path of the transformation file
    Type: String
  WorkFlowName:
    Description: Name of the workflow
    Type: String
    Default: test-workflow
  MyScriptLocation:
    Description: Location of ETL script
    Type: String
Resources:
  MyGlueWorkFlow:
    Type: AWS::Glue::Workflow
    Properties:
      Description: test cfn workflow
      Name:
        Ref: WorkFlowName
  MyGlueCrawler:
    Type: AWS::Glue::Crawler
    Properties:
      DatabaseName: cfndb
      Description: My crawler
      Name: MyGlueCrawler
      Role: AWSGlueServiceRole
      TablePrefix: cfn_
      Targets:
        S3Targets:
        - Path: s3://crawler-public-us-east-1/flight/2016/csv
    DependsOn:
    - MyGlueWorkFlow
  MyGlueCrawlerTrigger:
    Type: AWS::Glue::Trigger
    Properties:
      Name: MyCrawlerTrigger
      Type: ON_DEMAND
      Description: Crawler trigger
      WorkflowName:
        Ref: MyGlueWorkFlow
      Actions:
      - CrawlerName:
          Ref: MyGlueCrawler
    DependsOn:
    - MyGlueCrawler
  MyGlueJob:
    Type: AWS::Glue::Job
    Properties:
      Command:
        Name: glueetl
        ScriptLocation:
          Ref: MyScriptLocation
      Description: My workflow job
      GlueVersion: '1.0'
      Name: MyGlueJob
      Role: AWSGlueServiceRole
      DefaultArguments:
        "--outputpath":
          Ref: OutputPathLocation
    DependsOn:
    - MyGlueCrawler
  MyGlueJobTrigger:
    Type: AWS::Glue::Trigger
    Properties:
      Name: MyGlueJobTrigger
      Type: CONDITIONAL
      Description: Job trigger
      WorkflowName:
        Ref: MyGlueWorkFlow
      StartOnCreation: 'true'
      Actions:
      - JobName:
          Ref: MyGlueJob
      Predicate:
        Conditions:
        - LogicalOperator: EQUALS
          CrawlerName:
            Ref: MyGlueCrawler
          CrawlState: SUCCEEDED
        Logical: ANY
    DependsOn:
    - MyGlueJob



profile picture
专家
已回答 5 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则