Aws Glue ETL duplicate values

0

Hello, I am having issues with the ETLs in Glue, its look like every time that I run an ETL is not upgrading the values, it's like loading all like news and keeping the olds, so when I do the ETL, I finish with the same values 3 or 4 times, I already check the databases and They are fine, Looking for answers, I found that I probably have the wrong settings for S3, and I didn't find any in the ETLs panels to change the options to upgrade or manage how the S3 bucket is being save, I also change the scrip auto generate for this, however is still creating duplicates in the tables. import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ import org.apache.spark.sql.functions._ import com.amazonaws.services.glue.DynamicFrame

object GlueApp { def main(sysArgs: Array[String]) { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Script generated for node AWS Glue Data Catalog val AWSGlueDataCatalog_node1698042482062 = glueContext.getCatalogSource(database="rds-mantenimiento", tableName="mantenimiento_mantenimiento_equipos", transformationContext="AWSGlueDataCatalog_node1698042482062").getDynamicFrame()

// Script generated for node Change Schema
val ChangeSchema_node1698042500936 = AWSGlueDataCatalog_node1698042482062.applyMapping(mappings=Seq(("area", "string", "area", "string"), ("tipo", "string", "tipo", "string"), ("id", "int", "id", "int")), caseSensitive=false, transformationContext="ChangeSchema_node1698042500936")

// Script generated for node Drop Duplicates
val DropDuplicates_node1698042522554 = DynamicFrame(ChangeSchema_node1698042500936.toDF().dropDuplicates(Seq("id")), glueContext)

// Script generated for node AWS Glue Data Catalog
 val AWSGlueDataCatalog_node1698042542753 = glueContext.getCatalogSink(database="process_mantpro", tableName="process_mantenimiento.equipos",additionalOptions=JsonOptions("""{"enableUpdateCatalog": true, "updateBehavior": "UPDATE_IN_DATABASE"}"""), transformationContext="AWSGlueDataCatalog_node1698042542753").writeDynamicFrame(DropDuplicates_node1698042522554)

Job.commit()

} }

Some One could please help, what to do, or where to find information on how I can change the way it saving the files? Thanks.

質問済み 7ヶ月前372ビュー
1回答
0
承認された回答

It seems you expect the DB writing to do "upserts" while it only does "append".
If the source has all the data, you could wipe the output path before writing by using glueContext.purge_s3_path("s3://yourbucket/yourtable/", {"retentionPeriod": 0}). If you need to do incremental updates, then you need a format that allows that, such as Iceberg, Hudi or Delta

profile pictureAWS
エキスパート
回答済み 7ヶ月前
  • Thanks, I will Change my files to Iceberg, do you have any tutorial, documentations or advise about working with Iceberg?

ログインしていません。 ログイン 回答を投稿する。

優れた回答とは、質問に明確に答え、建設的なフィードバックを提供し、質問者の専門分野におけるスキルの向上を促すものです。

質問に答えるためのガイドライン