How to detect a duplicate row and then update it in PySpark?

0

I have a Glue ETL Job in which some of the data has issues. There is a case where a row is duplicated, and what I need to do is increase the value by 1 hour on the duplicate.

So imagine a set of data that looks like:

NameColorSizeValue
AlphaBlueLarge1
AlphaBlueLarge1
BravoRedSmall5

So it would see that Alpha row is a duplicate and on the duplicate row it would increase value to 2. So basically it needs to find the duplicated row and update it. This should only happen once in my corner case, so there won't be more than 1 duplicate for any orignal row. In Pandas there is the .duplicated() method, but I don't see something like that in PySpark and so I am trying to think of a way to deal with it. If I could iterate the Spark DF then I could build dictionaries with counters, and if the count was 2, then do an update.......not sure if that is a good way.

bfeeny
質問済み 2年前1664ビュー
1回答
0

Hi , If I understood well you would like to achieve something like:

NameColorSizeValueTime
AlphaBlueLarge201-04-2022 09:58:30
AlphaBlueLarge101-04-2022 08:58:30
BravoRedSmall501-04-2022 09:58:30

Is that right?

You will only have one duplicates for any original row and the duplicates value should be increased by 1.

you could look into something like:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

windowSpec  = Window.partitionBy("Name", "Color", "Size", "Value").orderBy("Time_col")
df3=df.withColumn("row_num",F.row_number().over(windowSpec))
df4=df3.withColumn("Value", F.when(df3.row_num==2,df3.Value+1).otherwise(df3.Value)).drop(df3.row_num)

my ouptput:

root
 |-- Name: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Value: long (nullable = true)
 |-- Time_col: string (nullable = true)

+-----+-----+-----+-----+-------------------+
|Name |Color|Size |Value|Time_col           |
+-----+-----+-----+-----+-------------------+
|Alpha|Blue |Large|1    |01-04-2022 08:58:30|
|Alpha|Blue |Large|2    |01-04-2022 09:58:30|
|Bravo|Red  |Small|5    |01-04-2022 09:58:30|
+-----+-----+-----+-----+-------------------+

it should work also without the Time col.

hope this helps

AWS
エキスパート
回答済み 2年前
  • I need to keep the duplicate row and just increment the "Value" column by a certain amount.

    | Name | Color | Size | Value | | Alpha | Blue | Large | 1 | | Alpha | Blue | Large | 2 | | Brave | Red | Small | 5 |

    would be my output

    Also I tried to use a function to at least start iterating rows, and it doesn't seem to work. I am basically doing this, here is a small section of my code:

    if df.take(1):
        df = df.drop("col6") 
    
        df = df.withColumn("time", f.unix_timestamp("time", 'dd-MM-yyyy HH:mm:ss') * 1000)
        df = df.withColumn("timegmt", f.unix_timestamp("timegmt", 'dd-MM-yyyy HH:mm:ss') * 1000)
    
        df = df.withColumn("value", df.VALUE.cast('int'))
        df = df.withColumn("filename", f.split(f.input_file_name(), '/')[4])
    
        def increment_duplicates(row):
               print("yes")
               print(row)
            
        df.foreach(increment_duplicates)
    

    So all of this is inside the if clause. All of it executes but not my function.

  • @bfeeny, thank you for the clarification, I am going to update my answer please check if it helps

ログインしていません。 ログイン 回答を投稿する。

優れた回答とは、質問に明確に答え、建設的なフィードバックを提供し、質問者の専門分野におけるスキルの向上を促すものです。

質問に答えるためのガイドライン

関連するコンテンツ