How to detect a duplicate row and then update it in PySpark?

0

I have a Glue ETL Job in which some of the data has issues. There is a case where a row is duplicated, and what I need to do is increase the value by 1 hour on the duplicate.

So imagine a set of data that looks like:

NameColorSizeValue
AlphaBlueLarge1
AlphaBlueLarge1
BravoRedSmall5

So it would see that Alpha row is a duplicate and on the duplicate row it would increase value to 2. So basically it needs to find the duplicated row and update it. This should only happen once in my corner case, so there won't be more than 1 duplicate for any orignal row. In Pandas there is the .duplicated() method, but I don't see something like that in PySpark and so I am trying to think of a way to deal with it. If I could iterate the Spark DF then I could build dictionaries with counters, and if the count was 2, then do an update.......not sure if that is a good way.

bfeeny
demandé il y a 2 ans1664 vues
1 réponse
0

Hi , If I understood well you would like to achieve something like:

NameColorSizeValueTime
AlphaBlueLarge201-04-2022 09:58:30
AlphaBlueLarge101-04-2022 08:58:30
BravoRedSmall501-04-2022 09:58:30

Is that right?

You will only have one duplicates for any original row and the duplicates value should be increased by 1.

you could look into something like:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

windowSpec  = Window.partitionBy("Name", "Color", "Size", "Value").orderBy("Time_col")
df3=df.withColumn("row_num",F.row_number().over(windowSpec))
df4=df3.withColumn("Value", F.when(df3.row_num==2,df3.Value+1).otherwise(df3.Value)).drop(df3.row_num)

my ouptput:

root
 |-- Name: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Value: long (nullable = true)
 |-- Time_col: string (nullable = true)

+-----+-----+-----+-----+-------------------+
|Name |Color|Size |Value|Time_col           |
+-----+-----+-----+-----+-------------------+
|Alpha|Blue |Large|1    |01-04-2022 08:58:30|
|Alpha|Blue |Large|2    |01-04-2022 09:58:30|
|Bravo|Red  |Small|5    |01-04-2022 09:58:30|
+-----+-----+-----+-----+-------------------+

it should work also without the Time col.

hope this helps

AWS
EXPERT
répondu il y a 2 ans
  • I need to keep the duplicate row and just increment the "Value" column by a certain amount.

    | Name | Color | Size | Value | | Alpha | Blue | Large | 1 | | Alpha | Blue | Large | 2 | | Brave | Red | Small | 5 |

    would be my output

    Also I tried to use a function to at least start iterating rows, and it doesn't seem to work. I am basically doing this, here is a small section of my code:

    if df.take(1):
        df = df.drop("col6") 
    
        df = df.withColumn("time", f.unix_timestamp("time", 'dd-MM-yyyy HH:mm:ss') * 1000)
        df = df.withColumn("timegmt", f.unix_timestamp("timegmt", 'dd-MM-yyyy HH:mm:ss') * 1000)
    
        df = df.withColumn("value", df.VALUE.cast('int'))
        df = df.withColumn("filename", f.split(f.input_file_name(), '/')[4])
    
        def increment_duplicates(row):
               print("yes")
               print(row)
            
        df.foreach(increment_duplicates)
    

    So all of this is inside the if clause. All of it executes but not my function.

  • @bfeeny, thank you for the clarification, I am going to update my answer please check if it helps

Vous n'êtes pas connecté. Se connecter pour publier une réponse.

Une bonne réponse répond clairement à la question, contient des commentaires constructifs et encourage le développement professionnel de la personne qui pose la question.

Instructions pour répondre aux questions