For json it should be possible to get the filename using :
format_options={
"attachFilename": "filenameCol"
}
current_timestamp() is just a regular function, I don't see why it doesn't work (maybe it's a side effect of input_file_name() not working and when you remove it is fine
For large datasets, it's possible that your approach of using the input_file_name and current_timestamp functions from pyspark.sql.functions is causing issues because these functions might not be scalable enough to handle large data. To address this, you can try a different approach using the mapPartitions transformation in PySpark, which is more efficient and scalable for larger datasets.
Here's an example of how you can achieve this:
Define a function that processes each partition of the DataFrame, and adds the input file name, current timestamp, and any other custom calculations.
from pyspark.sql import Row
from datetime import datetime
def process_partition(iter):
input_file_name = ''
for row in iter:
if 'input_file_name' in row:
input_file_name = row['input_file_name']
continue
current_timestamp = datetime.now()
# Perform custom calculations here
yield Row(*row, input_file_name=input_file_name, current_timestamp=current_timestamp)
Apply the process_partition function to your DataFrame using mapPartitions.
from pyspark.sql.types import StringType, TimestampType
select1_rdd = select1.toDF().rdd.mapPartitions(process_partition)
Convert the RDD back to a DataFrame, and add the new columns to the schema.
select1_schema = select1.schema
select1_schema = select1_schema.add("input_file_name", StringType(), nullable=True)
select1_schema = select1_schema.add("current_timestamp", TimestampType(), nullable=True)
select1_enriched_df = glueContext.createDataFrame(select1_rdd, schema=select1_schema)
Continue with your ETL process using the select1_enriched_df DataFrame. This approach should be more efficient and scalable, allowing you to handle large datasets and perform custom calculations based on the contents of the input file. Let me know if i answered your question
Relevant content
- asked 3 months ago
- asked 5 months ago
- AWS OFFICIALUpdated 3 months ago
- AWS OFFICIALUpdated 7 months ago
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 3 months ago
Hey, thanks for the answer. How can I use this format_options? I'm operating on DynamicFrame nodes
Okay I found that, you mean as a part of input node definition. Unfortunately I'm using Data Catalogue as input node, not S3 connection. My data in S3 is partitioned and I also need to persist that in DB