Is it possible to consolidate the JSON data in every lambda_handler execution during AWS S3 Batch operation ?

0

I am trying to invoke AWS Lambda function with S3 Batch operation. Intention is to copy items from one bucket to another bucket, and this code placed in my lambda_handler method. Now I want to track JSON data for every of my items from source bucket whether it is copied to destination bucket or not. for that I use just a below sample JSON snippet

[
  {
    "object_key": "123",
    "version_id": "123",
    "status": "copied"
  },
  {
    "object_key": "123",
    "version_id": "123",
    "status": "copied"
  }
]

Here is my lambda_handler

import logging
import json
from urllib import parse
import boto3
import os
import traceback
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)
logger.setLevel("INFO")

aws_region = os.environ['AWS_REGION']

s3 = boto3.resource("s3")
session = boto3.Session()
# session = boto3.Session(region_name=aws_region)
s3_client = session.client('s3')
s3 = boto3.resource('s3')
s3_resource = session.resource('s3')

def lambda_handler(event, context):
    logger.info("S3 Migration Lambda Handler with [AWS_REGION]: %s", aws_region)
    invocation_id = event["invocationId"]
    invocation_schema_version = event["invocationSchemaVersion"]
    results = []
    result_code = None
    result_string = None
    json_data_list = []
    json_filename='dummy-file-name.json'
    output_bucket='s3migration-meta-info'
    invocation_id = event["invocationId"]
    
    for task in event['tasks']:
        task_id = task["taskId"]  
        logger.info("[INVOCATION_ID]: %s, [INVOCATION_SCHEMA_VERSION]: %s, [TASK]: %s, [TASK_ID]: %s", invocation_id, invocation_schema_version, task, task_id)

        s3_migration_source_bucket = task['s3Bucket']
        s3_migration_source_object_key = task['s3Key']
        s3_migration_source_object_version_id = task['s3VersionId']
        destination_bucket_name = s3_migration_source_bucket + '-migrated'

for every task execution I wanted to generate JSON data which expecting to consolidate in a single file which ultimately I wanted to upload as a combined status for every item in a single report.

profile picture
asked 21 days ago207 views
1 Answer
2

Hlo,

To consolidate the JSON data for every Lambda handler execution during an AWS S3 Batch operation, you can accumulate the status information for each item and then upload the consolidated data to another S3 bucket as a combined report. Here's how you can modify your lambda_handler function to achieve this:

import logging
import json
import boto3
import os

logger = logging.getLogger(__name__)
logger.setLevel("INFO")

s3_client = boto3.client('s3')
s3_migration_report_bucket = 's3migration-meta-info'
report_filename = 'migration-report.json'

def lambda_handler(event, context):
    logger.info("S3 Migration Lambda Handler")
    
    invocation_id = event["invocationId"]
    invocation_schema_version = event["invocationSchemaVersion"]
    
    migration_report = []
    
    for task in event['tasks']:
        task_id = task["taskId"]
        logger.info("Processing Task ID: %s", task_id)
        
        s3_migration_source_bucket = task['s3Bucket']
        s3_migration_source_object_key = task['s3Key']
        s3_migration_source_object_version_id = task['s3VersionId']
        destination_bucket_name = s3_migration_source_bucket + '-migrated'
        
        # Your migration logic here
        
        # Assuming you have a status for each task
        migration_status = "copied"  # Example
        
        # Append status to migration report
        migration_report.append({
            "object_key": s3_migration_source_object_key,
            "version_id": s3_migration_source_object_version_id,
            "status": migration_status
        })
    
    # Upload migration report to S3
    upload_migration_report(invocation_id, migration_report)
    
    return {
        "statusCode": 200,
        "body": "Migration completed"
    }

def upload_migration_report(invocation_id, migration_report):
    report_key = f"{invocation_id}/{report_filename}"
    report_body = json.dumps(migration_report)
    
    s3_client.put_object(
        Bucket=s3_migration_report_bucket,
        Key=report_key,
        Body=report_body
    )
    logger.info("Migration report uploaded to S3: %s/%s", s3_migration_report_bucket, report_key)

In this modified lambda_handler function:

We accumulate the status information for each item in the migration_report list.

After processing all tasks, we upload the migration_report as a JSON file to an S3 bucket named s3migration-meta-info. The report is organized into folders based on the invocationId to keep reports from different invocations separate.

You can customize the migration_status based on your migration logic.

answered 21 days ago
  • @Anil, I have similar kind of solution but it is doing multiple time report upload, in my case my report bucket is Version Enabled, and when I cross check my report file under report_bucket, it shows me 600+ version under versions section of file.

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions