How to resolve a list of "eventIds" into event attributes?

1

I have worked out how to gather a list of Event Ids, but cannot work out how to use them.

When my glue workflow is triggered (by a change to an S3 bucket), I was hoping to gather some parameters, or attributes, from the triggering event, in order to make decisions about what the workflow should do. Is this possible?

Other than to dedupe, what use is it to have the eventId?

glue_client = boto3.client('glue')
workflow_args = glue_client.get_workflow_run_properties(
    Name=args['WORKFLOW_NAME'], RunId=args['WORKFLOW_RUN_ID']
)["RunProperties"]
logger.info(pprint.pformat(workflow_args))
#  {'aws:eventIds': '[bc432a05-078e-21d8-07c4-2bc86175c476]'}
profile picture
질문됨 일 년 전270회 조회
1개 답변
1

Hi, From your post, I understand that you are trying to design a use case, whenever there is a s3 event a glue Workflow is triggered and you are looking for options to get other details of the event id key. The "get_workflow_run_properties" API call will only return "aws:eventIds" within response element. This "aws:eventIds" matches with the "id" field returned from the "lookup_events" Cloudtrail API call.

As per the document [1] if you are using the eventbridge event to achieve your use case, please note that Eventbridge emits "NotifyEvent" event when it triggers the Glue workflow. You could look for Glue NotifyEvent API calls in CloudTrail to fetch the entire event that corresponds to the event id of the workflow run.

Sample code:

            glue_client = boto3.client("glue")
            args = getResolvedOptions(sys.argv, ['JOB_NAME','WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
            workflow_name = args['WORKFLOW_NAME']
            workflow_run_id = args['WORKFLOW_RUN_ID']

            workflow_params = glue_client.get_workflow_run_properties(Name=workflow_name,
                                                  RunId=workflow_run_id)["RunProperties"]
            batched_events = workflow_params['aws:eventIds']

            cloudtrail_client = boto3.client('cloudtrail', region_name='us-east-1')

            response = cloudtrail_client.lookup_events(
                LookupAttributes=[
                    {
                        'AttributeKey': 'EventName',
                        'AttributeValue': 'NotifyEvent'

                    },
                ],
                StartTime=(datetime.now  () - timedelta(minutes=10)),
                EndTime=datetime.now(),
                MaxResults=100
            )

            events = response.get("Events",[])

            for event in events:
            	cloudtrail_event = event['CloudTrailEvent']
                event_payload=json.loads(cloudtrail_event)['requestParameters']['eventPayload']
                if "[{}]".format(event_payload['eventId']) == batched_events:
                    print ("Details :: ",event_payload['eventBody'])

As a workaround you can configure a lambda function as EventBridge rule target (instead of setting Glue workflow as target) and then invoke Glue workflow or job from lambda function and pass the event details as workflow run properties or job parameters. Reference: [1] https://docs.aws.amazon.com/glue/latest/dg/workflows_overview.html [2] https://aws.amazon.com/blogs/big-data/build-a-serverless-event-driven-workflow-with-aws-glue-and-amazon-eventbridge/
[3] https://docs.aws.amazon.com/glue/latest/dg/workflows_overview.html
[4] https://docs.aws.amazon.com/glue/latest/dg/starting-workflow-eventbridge.html

AWS
지원 엔지니어
답변함 일 년 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠