AWS Glue adding S3 input_file_name and persisting that in RDS

0

Hey, My ETL Glue job is:

  1. reading from Data Catalogue (S3 based),
  2. selecting specific fields from the input file (which is json)
  3. doing some mapping
  4. saving output data to Postgres Aurora Data Catalogue

I also want to save things like S3 input file name and date of creation, and I'm currently using input_file_name and current_timestamp functions from pyspark.sql.functions:

select1_to_df = select1.toDF().withColumn("input_file_name", input_file_name()).withColumn("current_timestamp", current_timestamp()) select1_enriched = select1.fromDF(select1_to_df, glueContext, "select1_enriched")

This is working fine for small datasets, but for large I see empty input_file_name and current_timestamp columns in my DB.

Is there a better option to do that for a large data sets? It's possible that in the future I'll need to append more columns with custom calculations based on what is inside given file, so this functionality is crucial for me.

Thanks

pkantor
asked a year ago280 views
2 Answers
0

For json it should be possible to get the filename using :

    format_options={
        "attachFilename": "filenameCol"
    }

current_timestamp() is just a regular function, I don't see why it doesn't work (maybe it's a side effect of input_file_name() not working and when you remove it is fine

profile pictureAWS
EXPERT
answered a year ago
  • Hey, thanks for the answer. How can I use this format_options? I'm operating on DynamicFrame nodes

  • Okay I found that, you mean as a part of input node definition. Unfortunately I'm using Data Catalogue as input node, not S3 connection. My data in S3 is partitioned and I also need to persist that in DB

0

For large datasets, it's possible that your approach of using the input_file_name and current_timestamp functions from pyspark.sql.functions is causing issues because these functions might not be scalable enough to handle large data. To address this, you can try a different approach using the mapPartitions transformation in PySpark, which is more efficient and scalable for larger datasets.

Here's an example of how you can achieve this:

Define a function that processes each partition of the DataFrame, and adds the input file name, current timestamp, and any other custom calculations.

from pyspark.sql import Row
from datetime import datetime

def process_partition(iter):
    input_file_name = ''
    for row in iter:
        if 'input_file_name' in row:
            input_file_name = row['input_file_name']
            continue
        current_timestamp = datetime.now()
        # Perform custom calculations here
        yield Row(*row, input_file_name=input_file_name, current_timestamp=current_timestamp)

Apply the process_partition function to your DataFrame using mapPartitions.

from pyspark.sql.types import StringType, TimestampType

select1_rdd = select1.toDF().rdd.mapPartitions(process_partition)

Convert the RDD back to a DataFrame, and add the new columns to the schema.

select1_schema = select1.schema
select1_schema = select1_schema.add("input_file_name", StringType(), nullable=True)
select1_schema = select1_schema.add("current_timestamp", TimestampType(), nullable=True)

select1_enriched_df = glueContext.createDataFrame(select1_rdd, schema=select1_schema)

Continue with your ETL process using the select1_enriched_df DataFrame. This approach should be more efficient and scalable, allowing you to handle large datasets and perform custom calculations based on the contents of the input file. Let me know if i answered your question

AWS
EXPERT
ZJon
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions