Hello, I am having issues with the ETLs in Glue, its look like every time that I run an ETL is not upgrading the values, it's like loading all like news and keeping the olds, so when I do the ETL, I finish with the same values 3 or 4 times, I already check the databases and They are fine, Looking for answers, I found that I probably have the wrong settings for S3, and I didn't find any in the ETLs panels to change the options to upgrade or manage how the S3 bucket is being save, I also change the scrip auto generate for this, however is still creating duplicates in the tables.
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import org.apache.spark.sql.functions._
import com.amazonaws.services.glue.DynamicFrame
object GlueApp {
def main(sysArgs: Array[String]) {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
// @params: [JOB_NAME]
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
// Script generated for node AWS Glue Data Catalog
val AWSGlueDataCatalog_node1698042482062 = glueContext.getCatalogSource(database="rds-mantenimiento", tableName="mantenimiento_mantenimiento_equipos", transformationContext="AWSGlueDataCatalog_node1698042482062").getDynamicFrame()
// Script generated for node Change Schema
val ChangeSchema_node1698042500936 = AWSGlueDataCatalog_node1698042482062.applyMapping(mappings=Seq(("area", "string", "area", "string"), ("tipo", "string", "tipo", "string"), ("id", "int", "id", "int")), caseSensitive=false, transformationContext="ChangeSchema_node1698042500936")
// Script generated for node Drop Duplicates
val DropDuplicates_node1698042522554 = DynamicFrame(ChangeSchema_node1698042500936.toDF().dropDuplicates(Seq("id")), glueContext)
// Script generated for node AWS Glue Data Catalog
val AWSGlueDataCatalog_node1698042542753 = glueContext.getCatalogSink(database="process_mantpro", tableName="process_mantenimiento.equipos",additionalOptions=JsonOptions("""{"enableUpdateCatalog": true, "updateBehavior": "UPDATE_IN_DATABASE"}"""), transformationContext="AWSGlueDataCatalog_node1698042542753").writeDynamicFrame(DropDuplicates_node1698042522554)
Job.commit()
}
}
Some One could please help, what to do, or where to find information on how I can change the way it saving the files? Thanks.
Thanks, I will Change my files to Iceberg, do you have any tutorial, documentations or advise about working with Iceberg?