move multiple files from source S3 bucket into target S3 with source File Names

0

I have a requirement where we have 400+ parquet files in a S3 bucket, i need to write AWS Glue job to process all these 400+ files, transform(adding same 2 more extra columns for each file) them and write into Target S3 bucket whith the same name as of source file Name. Need a help on achive this. Acttually, I wrote the pyspark script with following algorithm .

  1. List all the files from Source Bucket

  2. for each file

    • Creating the Dataframe by S3 file name

    • Apply the tranform logic

    • write DF into Destination S3 bucket (here file name is autogenerated)

    • Search/get the new file created in Destination S3bucket

    • Rename the file

  3. Finally commiting the job this logic is failing as 'renaming code' is inside for loop where the file didn't created in destination folder since Job.Commit() is not calling in loop. Any suggesions to correct this alogorithem or any other way to achive this requirement.

Thanks in advance. Bhaskar

質問済み 2年前2813ビュー
1回答
0

Hello Bhaskar,

The output file names are something that is handled by the Spark execution engine. Spark does not offer any configuration option to set the name of these files, so there's no way to configure this on Glue. As a workaround, what some of our customers do is to run a small boto3 script at the end of the ETL job to list all the contents of the job's output path and then rename the objects in there. This could be run as code appended to the end of the job, as a separate ETL Python Shell job that is triggered whenever your job finishes, or even as a Lambda Function that is executed when the job is done.

Please find the sample script for this.

====Script====

BUCKET_NAME = <bucket-name> 
PREFIX = <prefix-name> 

## Glue Transformations and actions code ## 

#Sleep 30s to ensure consistency 

import boto3 
client = boto3.client('s3') 

response = client.list_objects( 
    Bucket='<BUCKET-NAME>', 
    Prefix='prefix1/prefix2/', 
) 

name = response["Contents"][0]["Key"] 

#Copy the object with the new name and delete old one 

client.copy_object(Bucket=BUCKET_NAME, CopySource=BUCKET_NAME+name, Key=PREFIX+"new_name") 
client.delete_object(Bucket=BUCKET Key=name)

============

In order for me to troubleshoot further, by taking a look at the logs in the backend, please feel free to open a support case with AWS using the following link with the sanitized script, the job run and we would be happy to help.

AWS
サポートエンジニア
回答済み 2年前

ログインしていません。 ログイン 回答を投稿する。

優れた回答とは、質問に明確に答え、建設的なフィードバックを提供し、質問者の専門分野におけるスキルの向上を促すものです。

質問に答えるためのガイドライン

関連するコンテンツ