Handle de-dup in Glue Job Pyspark

0

Hello

I am using PySpark on Glue Job to do ETL on a table sourced from S3 And S3 sourced from mysql via DMS (table schema as below, column 'op', 'row_updated_timestamp' & 'row_commit_timestamp' are added in DMS).

Every time updating this table, two rows of record are sent ('op' = 'U'), the first one is to rest the 'is_in_use' value to 'false' no matter its existing value (the second row in the below table), the second one is to update with the new value (the last row of the below table), my original code is to get the rownum == 1 (# approach 1) but I am trying to get the** last row **of record IF there are more than one row of the same "id" and its "op" =''U' and "row_updated_timestamp" are the same., otherwise, the final data output is not accurate. Hence, I tried #approach 2, yet it doesn't work. I am wondering how to fix this case? I also considered make a loop to loop over the row with same id, same op and same row_updated_timestamp and get the last row, but it doesn't seem to have seem to have similar code reference in this case.

Please help.

table example1:

op | id | created_at | is_enabled | is_in_use | updated_at | row_updated_timestamp | row_commit_timestamp

I | 1 | 2023-12-25 00:00:00 | FALSE | FALSE | 2023-12-25 00:00:00 | 2023-12-25 00:00:10 | 2023-12-25 00:00:10
U | 1 | 2023-12-25 00:00:00 | FALSE | FALSE | 2023-12-25 00:00:00 | 2023-12-27 00:00:10 | 2023-12-27 00:00:10 U | 1 | 2023-12-25 00:00:00 | TRUE | TRUE | 2023-12-25 00:00:00 | 2023-12-27 00:00:10 | 2023-12-27 00:00:10

expected result:

op | id | created_at | is_enabled | is_in_use | updated_at | row_updated_timestamp | row_commit_timestamp

U | 1 | 2023-12-25 00:00:00 | TRUE | TRUE | 2023-12-25 00:00:00 | 2023-12-27 00:00:10 | 2023-12-27 00:00:10

dyfTable= glueContext.create_dynamic_frame_from_catalog(
    database=db,
    table_name="table"
).toDF()


# approach 1
w = Window.partitionBy("id", "row_updated_timestamp").orderBy(dyfTable["row_updated_timestamp"].desc())
dyfTableRow = dyfTable.withColumn("rownum", row_number().over(w))
dyfTableResult = dyfTableRow.filter(dyfTableRow["rownum"] == 1).drop("rownum")

#approach 2
w = Window.partitionBy("id").orderBy(dyfTable["row_updated_timestamp"].desc())
dyfTableRow= dyfTable.withColumn("rownum", row_number().over(w))
dyfTableResult= dyfTableRow.filter(max(col("rownum")))

# Filter the DataFrame based on the 'op' column
dyfTableFiltered = dyfTableResult.filter(col("op") != "D")

dyfTableConvert = DynamicFrame.fromDF(dyfTableFiltered , glueContext, "convert")

# Write the data in the converted DynamicFrame to an S3 location
glueContext.write_dynamic_frame.from_options(
    frame=dyfTableConvert ,
    connection_type="s3",
    connection_options={"path": f"s3://{curated_bucket}/{dataset_folder}/result/"},
    format="parquet",
    transformation_ctx="datasink1"
)
job.commit()
asked 3 months ago109 views
1 Answer
0

In approach 1, it doesn't make sense to include row_updated_timestamp on the window if you are sorting by it, everything will be row 1.
On approach 2, the filter is incomplete (you are not comparing max to anything), I suggest you do the rownum == 1 (even if you need to invert the order.

profile pictureAWS
EXPERT
answered 3 months ago
  • Thanks for your suggestion, would it be possible to loop over and get the last row of record when 'rownum' > 1 and of the same 'row_updated_timestamp' based on the below code? w = Window.partitionBy("id").orderBy(dyfTable["row_updated_timestamp"].desc()) dyfTableRow = dyfTable.withColumn("rownum", row_number().over(w)) dyfTableResult = dyfTableRow.filter(dyfTableRow["rownum"] == 1).drop("rownum")

  • not sure what you mean by "loop", in a window you don't do that directly. if you don't want to filter you could use the "last(row_updated_timestamp)" to add a column with the last value in the window

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions