Handle de-dup in Glue Job Pyspark

0

Hello

I am using PySpark on Glue Job to do ETL on a table sourced from S3 And S3 sourced from mysql via DMS (table schema as below, column 'op', 'row_updated_timestamp' & 'row_commit_timestamp' are added in DMS).

Every time updating this table, two rows of record are sent ('op' = 'U'), the first one is to rest the 'is_in_use' value to 'false' no matter its existing value (the second row in the below table), the second one is to update with the new value (the last row of the below table), my original code is to get the rownum == 1 (# approach 1) but I am trying to get the** last row **of record IF there are more than one row of the same "id" and its "op" =''U' and "row_updated_timestamp" are the same., otherwise, the final data output is not accurate. Hence, I tried #approach 2, yet it doesn't work. I am wondering how to fix this case? I also considered make a loop to loop over the row with same id, same op and same row_updated_timestamp and get the last row, but it doesn't seem to have seem to have similar code reference in this case.

Please help.

table example1:

op | id | created_at | is_enabled | is_in_use | updated_at | row_updated_timestamp | row_commit_timestamp

I | 1 | 2023-12-25 00:00:00 | FALSE | FALSE | 2023-12-25 00:00:00 | 2023-12-25 00:00:10 | 2023-12-25 00:00:10
U | 1 | 2023-12-25 00:00:00 | FALSE | FALSE | 2023-12-25 00:00:00 | 2023-12-27 00:00:10 | 2023-12-27 00:00:10 U | 1 | 2023-12-25 00:00:00 | TRUE | TRUE | 2023-12-25 00:00:00 | 2023-12-27 00:00:10 | 2023-12-27 00:00:10

expected result:

op | id | created_at | is_enabled | is_in_use | updated_at | row_updated_timestamp | row_commit_timestamp

U | 1 | 2023-12-25 00:00:00 | TRUE | TRUE | 2023-12-25 00:00:00 | 2023-12-27 00:00:10 | 2023-12-27 00:00:10

dyfTable= glueContext.create_dynamic_frame_from_catalog(
    database=db,
    table_name="table"
).toDF()


# approach 1
w = Window.partitionBy("id", "row_updated_timestamp").orderBy(dyfTable["row_updated_timestamp"].desc())
dyfTableRow = dyfTable.withColumn("rownum", row_number().over(w))
dyfTableResult = dyfTableRow.filter(dyfTableRow["rownum"] == 1).drop("rownum")

#approach 2
w = Window.partitionBy("id").orderBy(dyfTable["row_updated_timestamp"].desc())
dyfTableRow= dyfTable.withColumn("rownum", row_number().over(w))
dyfTableResult= dyfTableRow.filter(max(col("rownum")))

# Filter the DataFrame based on the 'op' column
dyfTableFiltered = dyfTableResult.filter(col("op") != "D")

dyfTableConvert = DynamicFrame.fromDF(dyfTableFiltered , glueContext, "convert")

# Write the data in the converted DynamicFrame to an S3 location
glueContext.write_dynamic_frame.from_options(
    frame=dyfTableConvert ,
    connection_type="s3",
    connection_options={"path": f"s3://{curated_bucket}/{dataset_folder}/result/"},
    format="parquet",
    transformation_ctx="datasink1"
)
job.commit()
질문됨 4달 전118회 조회
1개 답변
0

In approach 1, it doesn't make sense to include row_updated_timestamp on the window if you are sorting by it, everything will be row 1.
On approach 2, the filter is incomplete (you are not comparing max to anything), I suggest you do the rownum == 1 (even if you need to invert the order.

profile pictureAWS
전문가
답변함 4달 전
  • Thanks for your suggestion, would it be possible to loop over and get the last row of record when 'rownum' > 1 and of the same 'row_updated_timestamp' based on the below code? w = Window.partitionBy("id").orderBy(dyfTable["row_updated_timestamp"].desc()) dyfTableRow = dyfTable.withColumn("rownum", row_number().over(w)) dyfTableResult = dyfTableRow.filter(dyfTableRow["rownum"] == 1).drop("rownum")

  • not sure what you mean by "loop", in a window you don't do that directly. if you don't want to filter you could use the "last(row_updated_timestamp)" to add a column with the last value in the window

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인