Duplicate Records in Parquet (Processed) Table after AWS Glue Job execution.

0

We have an AWS Glue pipeline where:

A crawler populates a raw database table from partitioned JSON files in S3.

S3 structure:

raw/  
├── org=21/  
│   └── 221.json  
└── org=23/  
    └── 654.json

Each JSON file contains an entity with a unique TransportId property, and the file name will match the value of the TransportId property. Example payload for file org=23/654.json:

{  
    "OrgId": 23,  
    "OrgName": "org23",  
    "TransportId": 654,  
    "TransportStatus": "New",  
    "UnixTimeStamp": 1744625534  
}

Nightly updates overwrite the JSON files by updating fields such as TransportStatus and UnixTimeStamp, while keeping TransportId unchanged.

After updating the raw S3 bucket, we run an ETL job that converts the raw data into Parquet files, which are then represented as a Glue table (the 'Processed' table).

Issue: When a file (e.g., org=23/654.json) is updated, the Processed table contains duplicates (both old and new records for the same TransportId).

Example Scenario:

Day 1: org=23/654.json → TransportStatus: "New", UnixTimeStamp: 1744625534

Day 2: Same file updated → TransportStatus: "InExecution", UnixTimeStamp: 1744629999

Result: Processed contains two entries for TransportId=654.


Attempted Solutions:

We tried deduplication logic in the Glue ETL job using

1. Spark window functions:

df = AWSGlueDataCatalog_node1744383413520.toDF()  

# Partition by TransportId, sort by latest UnixTimeStamp  
window = Window.partitionBy("transportid").orderBy(F.desc("unixtimestamp"))  
deduped_df = df.withColumn("rn", F.row_number().over(window)).filter("rn == 1").drop("rn")  

deduped_dynamic = DynamicFrame.fromDF(deduped_df, glueContext, "deduped")

2. SQL Query for Latest Records:

SELECT ds.*  
FROM myDataSource ds  
WHERE (transportid, unixtimestamp) IN (  
    SELECT transportid, MAX(unixtimestamp)  
    FROM myDataSource  
    GROUP BY transportid  
)  

3. Drop Duplicates Node:

Used the built-in "Drop Duplicates" node with transportid as the primary key. Enter image description here

Expected: Based on UnixTimeStamp, only the latest record per TransportId is written to Processed table.

Actual: Duplicates persist in the Glue Processed table.

We also ran a separate ETL job to deduplicate the final Processed data by TransportId, but duplicates still remain. Enter image description here


Does anyone know what the issue is? Or suggest how we could solve the problem. Has anyone come across a similar problem or a solution to the above scenario? Happy to provide more information, if required.

Thank you.

asked a month ago100 views
2 Answers
0

You're doing most of the right things already, but this issue typically stems from how AWS Glue jobs write Parquet files and how S3/Glue table structure handles partitioning. Let's break this down and refine your approach to ensure true deduplication in the Processed table.

🔍 Root Cause The key problem here is likely Glue writing multiple versions of the same TransportId into the same or different output partitions without cleaning up the previous versions. Even if your DataFrame is deduplicated, if you're using the default append behavior in your Glue job, old Parquet files are not deleted — so duplicates appear at the table level.

✅ Steps to Fully Deduplicate and Prevent Recurrence

  1. ❗ Ensure You Overwrite, Not Append, When Writing Even if your dataframe is deduplicated, if your Glue job is configured like: datasink = glueContext.write_dynamic_frame.from_options( frame=deduped_dynamic, connection_type="s3", connection_options={ "path": "s3://your-bucket/processed/", "partitionKeys": ["org"] }, format="parquet" ) It appends new data by default. So old files remain unless you explicitly overwrite.

✅ Use partitionKeys and mode=overwrite carefully: datasink = glueContext.write_dynamic_frame.from_options( frame=deduped_dynamic, connection_type="s3", connection_options={ "path": "s3://your-bucket/processed/", "partitionKeys": ["org"], "compression": "snappy", "overwrite": True # Only works if dynamic partition overwrite mode is enabled }, format="parquet" ) 🔸 But Glue doesn't support dynamic partition overwrite by default. You need to switch the Spark session to use partitionOverwriteMode=dynamic.

  1. ✅ Configure Spark Session for Dynamic Partition Overwrite Before any writes, in your script: spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic") This lets you overwrite only the affected partitions, avoiding full dataset rewrite.

  2. ✅ Use coalesce(1) or repartition(partitionColumns) carefully If you're writing many small files (default behavior), it can complicate deduplication. You can reduce the number of files per partition: deduped_df = deduped_df.repartition("org") # better than coalesce(1)

  3. ✅ Optional: Clean Old Files Before Write If dynamic overwrite mode is unreliable or you're working with few partitions:

Explicitly delete the partition(s) from S3 before writing new files.

You can use boto3 in a pre-job step: import boto3

s3 = boto3.resource('s3') bucket = s3.Bucket('your-bucket')

Delete specific partition

prefix = 'processed/org=23/' bucket.objects.filter(Prefix=prefix).delete() 5. ✅ Add De-duplication Guard in S3 Itself (Advanced) Use versioning + lifecycle policies in your processed/ bucket, or add a cleanup job that:

Uses Athena to find latest rows

Rewrites only latest data

Deletes older Parquet files

✅ Final Glue Job Summary (Best Practice) Read JSON using crawler-generated schema

Deduplicate using row_number() over partitionBy(transportid)

Use partitionOverwriteMode=dynamic

Repartition by org

Write with overwrite=True

Use partitionKeys=["org"]

answered a month ago
0

Thank you very much. Based on your answer I wrote the code. But every time I run ETL Job I get more and more duplicates. Maybe I missed something important somewhere?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

# Script generated for node AWS Glue Data Catalog
raw_data_frame = glueContext.create_dynamic_frame.from_catalog(
    database="demo", table_name="demo", transformation_ctx="raw_data_frame")

raw_df = raw_data_frame.toDF()
window_spec = Window.partitionBy("transportid").orderBy(F.col("unixtimestamp").desc())
deduped_df = raw_df.withColumn("row_num", F.row_number().over(window_spec)).filter(F.col("row_num") == 1).drop("row_num")

deduped_df = deduped_df.repartition("org")

deduped_dynamic_frame = DynamicFrame.fromDF(deduped_df, glueContext, "deduped_dynamic_frame")

glueContext.write_dynamic_frame.from_options(
    frame=deduped_dynamic_frame,
    connection_type="s3",
    connection_options={
        "path": "s3://my-play-bucket/demo-result/",
        "partitionKeys": ["org"],
        "compression": "snappy",
        "overwrite": True  # dynamic overwrite
    },
    format="parquet",
)


processed_data_sink = glueContext.getSink(
    path="s3://my-play-bucket/demo-result/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=["org"],
    enableUpdateCatalog=True,
    connection_options={
        "mode": "overwrite",
        "partitionOverwriteMode": "dynamic",
    },
    transformation_ctx="processed_data_sink",
)
processed_data_sink.setCatalogInfo(
    catalogDatabase="demo",
    catalogTableName="demo-result",
)
processed_data_sink.setFormat("glueparquet", compression="snappy")
processed_data_sink.writeFrame(deduped_dynamic_frame)

job.commit()

Enter image description here

answered a month ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions