- Newest
- Most votes
- Most comments
You're doing most of the right things already, but this issue typically stems from how AWS Glue jobs write Parquet files and how S3/Glue table structure handles partitioning. Let's break this down and refine your approach to ensure true deduplication in the Processed table.
🔍 Root Cause The key problem here is likely Glue writing multiple versions of the same TransportId into the same or different output partitions without cleaning up the previous versions. Even if your DataFrame is deduplicated, if you're using the default append behavior in your Glue job, old Parquet files are not deleted — so duplicates appear at the table level.
✅ Steps to Fully Deduplicate and Prevent Recurrence
- ❗ Ensure You Overwrite, Not Append, When Writing Even if your dataframe is deduplicated, if your Glue job is configured like: datasink = glueContext.write_dynamic_frame.from_options( frame=deduped_dynamic, connection_type="s3", connection_options={ "path": "s3://your-bucket/processed/", "partitionKeys": ["org"] }, format="parquet" ) It appends new data by default. So old files remain unless you explicitly overwrite.
✅ Use partitionKeys and mode=overwrite carefully: datasink = glueContext.write_dynamic_frame.from_options( frame=deduped_dynamic, connection_type="s3", connection_options={ "path": "s3://your-bucket/processed/", "partitionKeys": ["org"], "compression": "snappy", "overwrite": True # Only works if dynamic partition overwrite mode is enabled }, format="parquet" ) 🔸 But Glue doesn't support dynamic partition overwrite by default. You need to switch the Spark session to use partitionOverwriteMode=dynamic.
-
✅ Configure Spark Session for Dynamic Partition Overwrite Before any writes, in your script: spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic") This lets you overwrite only the affected partitions, avoiding full dataset rewrite.
-
✅ Use coalesce(1) or repartition(partitionColumns) carefully If you're writing many small files (default behavior), it can complicate deduplication. You can reduce the number of files per partition: deduped_df = deduped_df.repartition("org") # better than coalesce(1)
-
✅ Optional: Clean Old Files Before Write If dynamic overwrite mode is unreliable or you're working with few partitions:
Explicitly delete the partition(s) from S3 before writing new files.
You can use boto3 in a pre-job step: import boto3
s3 = boto3.resource('s3') bucket = s3.Bucket('your-bucket')
Delete specific partition
prefix = 'processed/org=23/' bucket.objects.filter(Prefix=prefix).delete() 5. ✅ Add De-duplication Guard in S3 Itself (Advanced) Use versioning + lifecycle policies in your processed/ bucket, or add a cleanup job that:
Uses Athena to find latest rows
Rewrites only latest data
Deletes older Parquet files
✅ Final Glue Job Summary (Best Practice) Read JSON using crawler-generated schema
Deduplicate using row_number() over partitionBy(transportid)
Use partitionOverwriteMode=dynamic
Repartition by org
Write with overwrite=True
Use partitionKeys=["org"]
Thank you very much. Based on your answer I wrote the code. But every time I run ETL Job I get more and more duplicates. Maybe I missed something important somewhere?
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
# Script generated for node AWS Glue Data Catalog
raw_data_frame = glueContext.create_dynamic_frame.from_catalog(
database="demo", table_name="demo", transformation_ctx="raw_data_frame")
raw_df = raw_data_frame.toDF()
window_spec = Window.partitionBy("transportid").orderBy(F.col("unixtimestamp").desc())
deduped_df = raw_df.withColumn("row_num", F.row_number().over(window_spec)).filter(F.col("row_num") == 1).drop("row_num")
deduped_df = deduped_df.repartition("org")
deduped_dynamic_frame = DynamicFrame.fromDF(deduped_df, glueContext, "deduped_dynamic_frame")
glueContext.write_dynamic_frame.from_options(
frame=deduped_dynamic_frame,
connection_type="s3",
connection_options={
"path": "s3://my-play-bucket/demo-result/",
"partitionKeys": ["org"],
"compression": "snappy",
"overwrite": True # dynamic overwrite
},
format="parquet",
)
processed_data_sink = glueContext.getSink(
path="s3://my-play-bucket/demo-result/",
connection_type="s3",
updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=["org"],
enableUpdateCatalog=True,
connection_options={
"mode": "overwrite",
"partitionOverwriteMode": "dynamic",
},
transformation_ctx="processed_data_sink",
)
processed_data_sink.setCatalogInfo(
catalogDatabase="demo",
catalogTableName="demo-result",
)
processed_data_sink.setFormat("glueparquet", compression="snappy")
processed_data_sink.writeFrame(deduped_dynamic_frame)
job.commit()
Relevant content
- asked 2 years ago
- AWS OFFICIALUpdated 3 months ago