move multiple files from source S3 bucket into target S3 with source File Names

0

I have a requirement where we have 400+ parquet files in a S3 bucket, i need to write AWS Glue job to process all these 400+ files, transform(adding same 2 more extra columns for each file) them and write into Target S3 bucket whith the same name as of source file Name. Need a help on achive this. Acttually, I wrote the pyspark script with following algorithm .

  1. List all the files from Source Bucket

  2. for each file

    • Creating the Dataframe by S3 file name

    • Apply the tranform logic

    • write DF into Destination S3 bucket (here file name is autogenerated)

    • Search/get the new file created in Destination S3bucket

    • Rename the file

  3. Finally commiting the job this logic is failing as 'renaming code' is inside for loop where the file didn't created in destination folder since Job.Commit() is not calling in loop. Any suggesions to correct this alogorithem or any other way to achive this requirement.

Thanks in advance. Bhaskar

질문됨 2년 전2813회 조회
1개 답변
0

Hello Bhaskar,

The output file names are something that is handled by the Spark execution engine. Spark does not offer any configuration option to set the name of these files, so there's no way to configure this on Glue. As a workaround, what some of our customers do is to run a small boto3 script at the end of the ETL job to list all the contents of the job's output path and then rename the objects in there. This could be run as code appended to the end of the job, as a separate ETL Python Shell job that is triggered whenever your job finishes, or even as a Lambda Function that is executed when the job is done.

Please find the sample script for this.

====Script====

BUCKET_NAME = <bucket-name> 
PREFIX = <prefix-name> 

## Glue Transformations and actions code ## 

#Sleep 30s to ensure consistency 

import boto3 
client = boto3.client('s3') 

response = client.list_objects( 
    Bucket='<BUCKET-NAME>', 
    Prefix='prefix1/prefix2/', 
) 

name = response["Contents"][0]["Key"] 

#Copy the object with the new name and delete old one 

client.copy_object(Bucket=BUCKET_NAME, CopySource=BUCKET_NAME+name, Key=PREFIX+"new_name") 
client.delete_object(Bucket=BUCKET Key=name)

============

In order for me to troubleshoot further, by taking a look at the logs in the backend, please feel free to open a support case with AWS using the following link with the sanitized script, the job run and we would be happy to help.

AWS
지원 엔지니어
답변함 2년 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠