How to detect a duplicate row and then update it in PySpark?

0

I have a Glue ETL Job in which some of the data has issues. There is a case where a row is duplicated, and what I need to do is increase the value by 1 hour on the duplicate.

So imagine a set of data that looks like:

NameColorSizeValue
AlphaBlueLarge1
AlphaBlueLarge1
BravoRedSmall5

So it would see that Alpha row is a duplicate and on the duplicate row it would increase value to 2. So basically it needs to find the duplicated row and update it. This should only happen once in my corner case, so there won't be more than 1 duplicate for any orignal row. In Pandas there is the .duplicated() method, but I don't see something like that in PySpark and so I am trying to think of a way to deal with it. If I could iterate the Spark DF then I could build dictionaries with counters, and if the count was 2, then do an update.......not sure if that is a good way.

bfeeny
asked 2 years ago1649 views
1 Answer
0

Hi , If I understood well you would like to achieve something like:

NameColorSizeValueTime
AlphaBlueLarge201-04-2022 09:58:30
AlphaBlueLarge101-04-2022 08:58:30
BravoRedSmall501-04-2022 09:58:30

Is that right?

You will only have one duplicates for any original row and the duplicates value should be increased by 1.

you could look into something like:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

windowSpec  = Window.partitionBy("Name", "Color", "Size", "Value").orderBy("Time_col")
df3=df.withColumn("row_num",F.row_number().over(windowSpec))
df4=df3.withColumn("Value", F.when(df3.row_num==2,df3.Value+1).otherwise(df3.Value)).drop(df3.row_num)

my ouptput:

root
 |-- Name: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Value: long (nullable = true)
 |-- Time_col: string (nullable = true)

+-----+-----+-----+-----+-------------------+
|Name |Color|Size |Value|Time_col           |
+-----+-----+-----+-----+-------------------+
|Alpha|Blue |Large|1    |01-04-2022 08:58:30|
|Alpha|Blue |Large|2    |01-04-2022 09:58:30|
|Bravo|Red  |Small|5    |01-04-2022 09:58:30|
+-----+-----+-----+-----+-------------------+

it should work also without the Time col.

hope this helps

AWS
EXPERT
answered 2 years ago
  • I need to keep the duplicate row and just increment the "Value" column by a certain amount.

    | Name | Color | Size | Value | | Alpha | Blue | Large | 1 | | Alpha | Blue | Large | 2 | | Brave | Red | Small | 5 |

    would be my output

    Also I tried to use a function to at least start iterating rows, and it doesn't seem to work. I am basically doing this, here is a small section of my code:

    if df.take(1):
        df = df.drop("col6") 
    
        df = df.withColumn("time", f.unix_timestamp("time", 'dd-MM-yyyy HH:mm:ss') * 1000)
        df = df.withColumn("timegmt", f.unix_timestamp("timegmt", 'dd-MM-yyyy HH:mm:ss') * 1000)
    
        df = df.withColumn("value", df.VALUE.cast('int'))
        df = df.withColumn("filename", f.split(f.input_file_name(), '/')[4])
    
        def increment_duplicates(row):
               print("yes")
               print(row)
            
        df.foreach(increment_duplicates)
    

    So all of this is inside the if clause. All of it executes but not my function.

  • @bfeeny, thank you for the clarification, I am going to update my answer please check if it helps

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions