Aws Glue ETL duplicate values

0

Hello, I am having issues with the ETLs in Glue, its look like every time that I run an ETL is not upgrading the values, it's like loading all like news and keeping the olds, so when I do the ETL, I finish with the same values 3 or 4 times, I already check the databases and They are fine, Looking for answers, I found that I probably have the wrong settings for S3, and I didn't find any in the ETLs panels to change the options to upgrade or manage how the S3 bucket is being save, I also change the scrip auto generate for this, however is still creating duplicates in the tables. import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ import org.apache.spark.sql.functions._ import com.amazonaws.services.glue.DynamicFrame

object GlueApp { def main(sysArgs: Array[String]) { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Script generated for node AWS Glue Data Catalog val AWSGlueDataCatalog_node1698042482062 = glueContext.getCatalogSource(database="rds-mantenimiento", tableName="mantenimiento_mantenimiento_equipos", transformationContext="AWSGlueDataCatalog_node1698042482062").getDynamicFrame()

// Script generated for node Change Schema
val ChangeSchema_node1698042500936 = AWSGlueDataCatalog_node1698042482062.applyMapping(mappings=Seq(("area", "string", "area", "string"), ("tipo", "string", "tipo", "string"), ("id", "int", "id", "int")), caseSensitive=false, transformationContext="ChangeSchema_node1698042500936")

// Script generated for node Drop Duplicates
val DropDuplicates_node1698042522554 = DynamicFrame(ChangeSchema_node1698042500936.toDF().dropDuplicates(Seq("id")), glueContext)

// Script generated for node AWS Glue Data Catalog
 val AWSGlueDataCatalog_node1698042542753 = glueContext.getCatalogSink(database="process_mantpro", tableName="process_mantenimiento.equipos",additionalOptions=JsonOptions("""{"enableUpdateCatalog": true, "updateBehavior": "UPDATE_IN_DATABASE"}"""), transformationContext="AWSGlueDataCatalog_node1698042542753").writeDynamicFrame(DropDuplicates_node1698042522554)

Job.commit()

} }

Some One could please help, what to do, or where to find information on how I can change the way it saving the files? Thanks.

asked 6 months ago333 views
1 Answer
0
Accepted Answer

It seems you expect the DB writing to do "upserts" while it only does "append".
If the source has all the data, you could wipe the output path before writing by using glueContext.purge_s3_path("s3://yourbucket/yourtable/", {"retentionPeriod": 0}). If you need to do incremental updates, then you need a format that allows that, such as Iceberg, Hudi or Delta

profile pictureAWS
EXPERT
answered 6 months ago
  • Thanks, I will Change my files to Iceberg, do you have any tutorial, documentations or advise about working with Iceberg?

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions