AWS Glue Interactive Notebooks: default df.saveAsTable() behavior overwrites entire table instead of dynamically based on dataframe values

0

In AWS Glue Interactive Notebooks, the default behavior, when saving data via df.saveAsTable(), is to overwrite an entire table regardless of the dataframe values. How can I enable the Spark setting to only overwrite table partitions dynamically based on the dataframe?

Generic example:

  • "table1" has the partition column, event_date, with the values, "2023-01-01" and "2023-01-02"
  • The spark dataframe, df, has only the event_date value, "2023-01-03"
from awsglue.context import GlueContext
from pyspark.context import SparkContext

spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)

spark_session = (
    glue_context.spark_session.builder.appName(session_name)
    .config("spark.logConf", "true")
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .enableHiveSupport()
    .getOrCreate()
)

 (
        df.write.option("compression", "snappy")
        # including the path creates the table if not exists 
        # as external instead of managed
        .option("path", target_s3_path)
        .saveAsTable(
            name=target_db_table,
            mode="overwrite",
            format="parquet",
            partitionBy="event_date",
        )
    )
profile picture
Jaime
preguntada hace un año420 visualizaciones
1 Respuesta
0

By default when you do overwrite, since you are not specifying any partitions to update, it will override the whole table.
Changing spark.sql.sources.partitionOverwriteMode to dynamic (I think you can pass that as a write option), should detect which partitions are affected by your data and only override those

profile pictureAWS
EXPERTO
respondido hace un año
  • I tried both of the following methods and get the same issue in a Glue notebook while simultaneously reading from the table in Athena for testing purposes. The table definition should not be completely recreated and partitions re-added when "overwriting" a new partition. In my test, the data is only a new partition value.

    The biggest issue with this behavior is that, when a job is long-running, fails when writing data, or is killed, the table may not be able to be read until it is manually recreated.

    I also wonder if this type of behavior is because I am using Glue as the data catalog.

    Table <table name> does not exist
    # then table exists with no data showing up
    # then previous and new data shows up
    
        (
            df.write.option("compression", "snappy")
            .saveAsTable(
                name=target_db_table,
                mode="overwrite",
                format="parquet",
                partitionBy=partition_col_names,
                partitionOverwriteMode="dynamic",
            )
        )
    
        (
            df.write.option("compression", "snappy")
            .option("partitionOverwriteMode", "dynamic")
            .saveAsTable(
                name=target_db_table,
                mode="overwrite",
                format="parquet",
                partitionBy=partition_col_names,
            )
        )
    
  • Update: this is not ideal, but I was able to get new partitions to append dynamically and not drop/recreate the table by using the following:

    def table_exists(
        spark_session: SparkSession,
        db_table: str,
    ) -> bool:
        try:
            spark_session.sql(f"desc {db_table}")
            exists = True
        except AnalysisException:
            exists = False
    
        return exists
    
        # If table does not exist, provide S3 path when saving to create the table dynamically as an external table
        if not table_exists(spark_session=spark_session, db_table=target_db_table):
            (
                df.write.option("compression", write_compression)
                .option("path", target_s3_path)
                .saveAsTable(
                    name=target_db_table,
                    mode=write_mode,
                    format=write_format,
                    partitionBy=partition_col_names,
                )
            )
        # Otherwise, dynamically overwrite existing partition value combinations and append new partitions
        else:
            (
                df.select(spark_session.table(target_db_table).columns).write.insertInto(
                    tableName=target_db_table,
                    overwrite=True,
                )
            )
    
  • I don't think ".option("partitionOverwriteMode", "dynamic")" is doing anyway, without specifying the full property name

No has iniciado sesión. Iniciar sesión para publicar una respuesta.

Una buena respuesta responde claramente a la pregunta, proporciona comentarios constructivos y fomenta el crecimiento profesional en la persona que hace la pregunta.

Pautas para responder preguntas