如何通过编程方式创建Glue工作流程?

0

【以下的问题经过翻译处理】 有没有办法以编程方式创建 Glue 工作流?

我查看了 CloudFormation示例,但我仅找到一个创建一个空的工作流的示例(只有工作流名称、描述和属性)。 https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-glue-workflow.html

我也尝试查看 API 文档(https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-workflow.html),虽然该文档中列出了Glue工作流中的所有数据类型和所有结构,但是创建API里依然只有创建空工作流的方法。

请问如何根据 Lake Formation 中的蓝图创建工作流? 是否只需将某种预组装的 JSON 文件配置到创建Glue 工作流的流程中就可以了呢?

是否已经支持类似操作?还是需要等待可定制的蓝图?

更新:

正如已接受答案的代码片段中提到的,关键问题是使用下面的方法:

AAWS::Glue::Trigger

来构建工作流的构造。

具体来说,您需要:

  1. 使用 AWS::Glue::Workflow 创建工作流
  2. 如果您还需要创建数据库和连接(使用AWS::Glue::Database,AWS::Glue::Connection)
  3. 使用 AWS::Glue::Crawler 或 AWS::Glue::Job 创建要添加到工作流中的任何 Crawler 和任何作业
  4. 创建类型为 ON-DEMAND 的第一个触发器 (AWS::Glue::Trigger),Actions = 您的工作流需要启动的第一个爬虫或作业,工作流名称使用在第一步中创建的工作流
  5. 创建任何其他类型为 CONDITIONAL 的触发器

下面是一个示例(创建一个在 S3 存储桶 (cloudtraillogs) 上启动 Crawler 的工作流,如果成功,则启动一个 python 脚本来更改表和分区架构以使其与 Athena 一起工作))。

希望对大家有帮助!

---

AWSTemplateFormatVersion: '2010-09-09'
Description: Creates cloudtrail crwaler and catalog for Athena and a job to transform to Parquet

Parameters: 
  CloudtrailS3: 
    Type: String
    Description: Enter the unique bucket name where the cloud trails log are stored

  CloudtrailS3Path: 
    Type: String
    Description: Enter the path/prefix that you want to crawl 

  CloudtrailDataLakeS3: 
    Type: String
    Description: Enter the unique bucket name for the data lake in which to store the logs in Parquet Format

Resources:
    
  CloudTrailGlueExecutionRole:
    Type: 'AWS::IAM::Role'
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - glue.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Path: /
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
  
  GluePolicy:
    Properties:
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Action:
          - s3:GetBucketLocation
          - s3:GetObject
          - s3:PutObject
          - s3:ListBucket
          Effect: Allow
          Resource:
          - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailS3] ]
          - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailS3, '/*'] ]
          - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailDataLakeS3] ]
          - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailDataLakeS3, '/*'] ]
        - Action:
          - s3:DeleteObject
          Effect: Allow
          Resource:

          - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailDataLakeS3] ]
          - !Join ['', ['arn:aws:s3:::', !Ref CloudtrailDataLakeS3, '/*'] ]
      PolicyName: glue_cloudtrail_S3_policy
      Roles:
      - Ref: CloudTrailGlueExecutionRole
    Type: AWS::IAM::Policy

  GlueWorkflow:
    Type: AWS::Glue::Workflow
    Properties: 
      Description: Workflow to crawl the cloudtrail logs
      Name: cloudtrail_discovery_workflow

  GlueDatabaseCloudTrail:
    Type: AWS::Glue::Database
    Properties:
      # The database is created in the Data Catalog for your account
      CatalogId: !Ref AWS::AccountId   
      DatabaseInput:
        # The name of the database is defined in the Parameters section above
        Name: cloudtrail_db
        Description: Database to hold tables for NY Philarmonica data
        LocationUri: !Ref CloudtrailDataLakeS3
  
  GlueCrawlerCTSource:
    Type: AWS::Glue::Crawler
    Properties:
      Name: cloudtrail_source_crawler
      Role: !GetAtt CloudTrailGlueExecutionRole.Arn
      #Classifiers: none, use the default classifier
      Description: AWS Glue crawler to crawl cloudtrail logs
      Schedule: 
        ScheduleExpression: 'cron(0 9 * * ? *)'
      DatabaseName: !Ref GlueDatabaseCloudTrail
      Targets:
        S3Targets:
          - Path: !Sub
            - s3://${bucket}/${path}
            - {
              bucket: !Ref CloudtrailS3,
              path : !Ref CloudtrailS3Path
              }
            Exclusions: 
              - '*/CloudTrail-Digest/**'
              - '*/Config/**'
            
      #TablePrefix: ''
      SchemaChangePolicy:
        UpdateBehavior: "UPDATE_IN_DATABASE"
        DeleteBehavior: "LOG"
      Configuration: "{\"Version\":1.0,\"CrawlerOutput\":{\"Partitions\":{\"AddOrUpdateBehavior\":\"InheritFromTable\"},\"Tables\":{\"AddOrUpdateBehavior\":\"MergeNewColumns\"}}}"

  GlueJobConvertTable:
    Type: AWS::Glue::Job
    Properties:
      Name: ct_change_table_schema
      Role:
        Fn::GetAtt: [CloudTrailGlueExecutionRole, Arn]
      ExecutionProperty:
        MaxConcurrentRuns: 1
      GlueVersion: 1.0
      Command:
        Name: pythonshell
        PythonVersion: 3
        ScriptLocation: !Sub
          - s3://${bucket}/python/ct_change_table_schema.py
          - {bucket: !Ref CloudtrailDataLakeS3}
      DefaultArguments:
        '--TempDir': !Sub
          - s3://${bucket}/glue_tmp/
          - {bucket: !Ref CloudtrailDataLakeS3}
        "--job-bookmark-option" : "job-bookmark-disable"
        "--enable-metrics" : ""
    DependsOn:
      - CloudTrailGlueExecutionRole

  GlueSourceCrawlerTrigger:
    Type: AWS::Glue::Trigger
    Properties:
      Name: ct_start_source_crawl_Trigger
      Type: ON_DEMAND
      Description: Source Crawler trigger
      WorkflowName: !Ref GlueWorkflow
      Actions:
      - CrawlerName:
          Ref: GlueCrawlerCTSource
    DependsOn:
      - GlueCrawlerCTSource

  GlueJobTrigger:
    Type: AWS::Glue::Trigger
    Properties:
      Name: ct_change_schema_Job_Trigger
      Type: CONDITIONAL
      Description: Job trigger
      WorkflowName: !Ref GlueWorkflow
      StartOnCreation: 'true'
      Actions:
      - JobName: !Ref GlueJobConvertTable
      Predicate:
        Conditions:
        - LogicalOperator: EQUALS
          CrawlerName: !Ref GlueCrawlerCTSource
          CrawlState: SUCCEEDED
        Logical: ANY
    DependsOn:
    - GlueJobConvertTable

profile picture
EXPERTE
gefragt vor 6 Monaten23 Aufrufe
1 Antwort
0

【以下的回答经过翻译处理】 对于工作流程,您需要结合触发器、爬虫和作业来使用。 CloudFormation 可以覆盖大部分内容,但您可能仍然需要一些自定义资源和/或启动 Step Function 之类。

示例(来自互联网,[帖子](https://www.reddit.com/r/aws/comments/dl2f6p/aws_glue_workflow_cfn/)):

---
Parameters:
  OutputPathLocation:
    Description: Output path of the transformation file
    Type: String
  WorkFlowName:
    Description: Name of the workflow
    Type: String
    Default: test-workflow
  MyScriptLocation:
    Description: Location of ETL script
    Type: String
Resources:
  MyGlueWorkFlow:
    Type: AWS::Glue::Workflow
    Properties:
      Description: test cfn workflow
      Name:
        Ref: WorkFlowName
  MyGlueCrawler:
    Type: AWS::Glue::Crawler
    Properties:
      DatabaseName: cfndb
      Description: My crawler
      Name: MyGlueCrawler
      Role: AWSGlueServiceRole
      TablePrefix: cfn_
      Targets:
        S3Targets:
        - Path: s3://crawler-public-us-east-1/flight/2016/csv
    DependsOn:
    - MyGlueWorkFlow
  MyGlueCrawlerTrigger:
    Type: AWS::Glue::Trigger
    Properties:
      Name: MyCrawlerTrigger
      Type: ON_DEMAND
      Description: Crawler trigger
      WorkflowName:
        Ref: MyGlueWorkFlow
      Actions:
      - CrawlerName:
          Ref: MyGlueCrawler
    DependsOn:
    - MyGlueCrawler
  MyGlueJob:
    Type: AWS::Glue::Job
    Properties:
      Command:
        Name: glueetl
        ScriptLocation:
          Ref: MyScriptLocation
      Description: My workflow job
      GlueVersion: '1.0'
      Name: MyGlueJob
      Role: AWSGlueServiceRole
      DefaultArguments:
        "--outputpath":
          Ref: OutputPathLocation
    DependsOn:
    - MyGlueCrawler
  MyGlueJobTrigger:
    Type: AWS::Glue::Trigger
    Properties:
      Name: MyGlueJobTrigger
      Type: CONDITIONAL
      Description: Job trigger
      WorkflowName:
        Ref: MyGlueWorkFlow
      StartOnCreation: 'true'
      Actions:
      - JobName:
          Ref: MyGlueJob
      Predicate:
        Conditions:
        - LogicalOperator: EQUALS
          CrawlerName:
            Ref: MyGlueCrawler
          CrawlState: SUCCEEDED
        Logical: ANY
    DependsOn:
    - MyGlueJob



profile picture
EXPERTE
beantwortet vor 6 Monaten

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen