How to detect a duplicate row and then update it in PySpark?

0

I have a Glue ETL Job in which some of the data has issues. There is a case where a row is duplicated, and what I need to do is increase the value by 1 hour on the duplicate.

So imagine a set of data that looks like:

NameColorSizeValue
AlphaBlueLarge1
AlphaBlueLarge1
BravoRedSmall5

So it would see that Alpha row is a duplicate and on the duplicate row it would increase value to 2. So basically it needs to find the duplicated row and update it. This should only happen once in my corner case, so there won't be more than 1 duplicate for any orignal row. In Pandas there is the .duplicated() method, but I don't see something like that in PySpark and so I am trying to think of a way to deal with it. If I could iterate the Spark DF then I could build dictionaries with counters, and if the count was 2, then do an update.......not sure if that is a good way.

bfeeny
질문됨 2년 전1664회 조회
1개 답변
0

Hi , If I understood well you would like to achieve something like:

NameColorSizeValueTime
AlphaBlueLarge201-04-2022 09:58:30
AlphaBlueLarge101-04-2022 08:58:30
BravoRedSmall501-04-2022 09:58:30

Is that right?

You will only have one duplicates for any original row and the duplicates value should be increased by 1.

you could look into something like:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

windowSpec  = Window.partitionBy("Name", "Color", "Size", "Value").orderBy("Time_col")
df3=df.withColumn("row_num",F.row_number().over(windowSpec))
df4=df3.withColumn("Value", F.when(df3.row_num==2,df3.Value+1).otherwise(df3.Value)).drop(df3.row_num)

my ouptput:

root
 |-- Name: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Value: long (nullable = true)
 |-- Time_col: string (nullable = true)

+-----+-----+-----+-----+-------------------+
|Name |Color|Size |Value|Time_col           |
+-----+-----+-----+-----+-------------------+
|Alpha|Blue |Large|1    |01-04-2022 08:58:30|
|Alpha|Blue |Large|2    |01-04-2022 09:58:30|
|Bravo|Red  |Small|5    |01-04-2022 09:58:30|
+-----+-----+-----+-----+-------------------+

it should work also without the Time col.

hope this helps

AWS
전문가
답변함 2년 전
  • I need to keep the duplicate row and just increment the "Value" column by a certain amount.

    | Name | Color | Size | Value | | Alpha | Blue | Large | 1 | | Alpha | Blue | Large | 2 | | Brave | Red | Small | 5 |

    would be my output

    Also I tried to use a function to at least start iterating rows, and it doesn't seem to work. I am basically doing this, here is a small section of my code:

    if df.take(1):
        df = df.drop("col6") 
    
        df = df.withColumn("time", f.unix_timestamp("time", 'dd-MM-yyyy HH:mm:ss') * 1000)
        df = df.withColumn("timegmt", f.unix_timestamp("timegmt", 'dd-MM-yyyy HH:mm:ss') * 1000)
    
        df = df.withColumn("value", df.VALUE.cast('int'))
        df = df.withColumn("filename", f.split(f.input_file_name(), '/')[4])
    
        def increment_duplicates(row):
               print("yes")
               print(row)
            
        df.foreach(increment_duplicates)
    

    So all of this is inside the if clause. All of it executes but not my function.

  • @bfeeny, thank you for the clarification, I am going to update my answer please check if it helps

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠