Aws Glue ETL duplicate values

0

Hello, I am having issues with the ETLs in Glue, its look like every time that I run an ETL is not upgrading the values, it's like loading all like news and keeping the olds, so when I do the ETL, I finish with the same values 3 or 4 times, I already check the databases and They are fine, Looking for answers, I found that I probably have the wrong settings for S3, and I didn't find any in the ETLs panels to change the options to upgrade or manage how the S3 bucket is being save, I also change the scrip auto generate for this, however is still creating duplicates in the tables. import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ import org.apache.spark.sql.functions._ import com.amazonaws.services.glue.DynamicFrame

object GlueApp { def main(sysArgs: Array[String]) { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Script generated for node AWS Glue Data Catalog val AWSGlueDataCatalog_node1698042482062 = glueContext.getCatalogSource(database="rds-mantenimiento", tableName="mantenimiento_mantenimiento_equipos", transformationContext="AWSGlueDataCatalog_node1698042482062").getDynamicFrame()

// Script generated for node Change Schema
val ChangeSchema_node1698042500936 = AWSGlueDataCatalog_node1698042482062.applyMapping(mappings=Seq(("area", "string", "area", "string"), ("tipo", "string", "tipo", "string"), ("id", "int", "id", "int")), caseSensitive=false, transformationContext="ChangeSchema_node1698042500936")

// Script generated for node Drop Duplicates
val DropDuplicates_node1698042522554 = DynamicFrame(ChangeSchema_node1698042500936.toDF().dropDuplicates(Seq("id")), glueContext)

// Script generated for node AWS Glue Data Catalog
 val AWSGlueDataCatalog_node1698042542753 = glueContext.getCatalogSink(database="process_mantpro", tableName="process_mantenimiento.equipos",additionalOptions=JsonOptions("""{"enableUpdateCatalog": true, "updateBehavior": "UPDATE_IN_DATABASE"}"""), transformationContext="AWSGlueDataCatalog_node1698042542753").writeDynamicFrame(DropDuplicates_node1698042522554)

Job.commit()

} }

Some One could please help, what to do, or where to find information on how I can change the way it saving the files? Thanks.

已提問 7 個月前檢視次數 372 次
1 個回答
0
已接受的答案

It seems you expect the DB writing to do "upserts" while it only does "append".
If the source has all the data, you could wipe the output path before writing by using glueContext.purge_s3_path("s3://yourbucket/yourtable/", {"retentionPeriod": 0}). If you need to do incremental updates, then you need a format that allows that, such as Iceberg, Hudi or Delta

profile pictureAWS
專家
已回答 7 個月前
  • Thanks, I will Change my files to Iceberg, do you have any tutorial, documentations or advise about working with Iceberg?

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南