AWS Glue Interactive Notebooks: default df.saveAsTable() behavior overwrites entire table instead of dynamically based on dataframe values

0

In AWS Glue Interactive Notebooks, the default behavior, when saving data via df.saveAsTable(), is to overwrite an entire table regardless of the dataframe values. How can I enable the Spark setting to only overwrite table partitions dynamically based on the dataframe?

Generic example:

  • "table1" has the partition column, event_date, with the values, "2023-01-01" and "2023-01-02"
  • The spark dataframe, df, has only the event_date value, "2023-01-03"
from awsglue.context import GlueContext
from pyspark.context import SparkContext

spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)

spark_session = (
    glue_context.spark_session.builder.appName(session_name)
    .config("spark.logConf", "true")
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .enableHiveSupport()
    .getOrCreate()
)

 (
        df.write.option("compression", "snappy")
        # including the path creates the table if not exists 
        # as external instead of managed
        .option("path", target_s3_path)
        .saveAsTable(
            name=target_db_table,
            mode="overwrite",
            format="parquet",
            partitionBy="event_date",
        )
    )
profile picture
Jaime
asked a year ago387 views
1 Answer
0

By default when you do overwrite, since you are not specifying any partitions to update, it will override the whole table.
Changing spark.sql.sources.partitionOverwriteMode to dynamic (I think you can pass that as a write option), should detect which partitions are affected by your data and only override those

profile pictureAWS
EXPERT
answered a year ago
  • I tried both of the following methods and get the same issue in a Glue notebook while simultaneously reading from the table in Athena for testing purposes. The table definition should not be completely recreated and partitions re-added when "overwriting" a new partition. In my test, the data is only a new partition value.

    The biggest issue with this behavior is that, when a job is long-running, fails when writing data, or is killed, the table may not be able to be read until it is manually recreated.

    I also wonder if this type of behavior is because I am using Glue as the data catalog.

    Table <table name> does not exist
    # then table exists with no data showing up
    # then previous and new data shows up
    
        (
            df.write.option("compression", "snappy")
            .saveAsTable(
                name=target_db_table,
                mode="overwrite",
                format="parquet",
                partitionBy=partition_col_names,
                partitionOverwriteMode="dynamic",
            )
        )
    
        (
            df.write.option("compression", "snappy")
            .option("partitionOverwriteMode", "dynamic")
            .saveAsTable(
                name=target_db_table,
                mode="overwrite",
                format="parquet",
                partitionBy=partition_col_names,
            )
        )
    
  • Update: this is not ideal, but I was able to get new partitions to append dynamically and not drop/recreate the table by using the following:

    def table_exists(
        spark_session: SparkSession,
        db_table: str,
    ) -> bool:
        try:
            spark_session.sql(f"desc {db_table}")
            exists = True
        except AnalysisException:
            exists = False
    
        return exists
    
        # If table does not exist, provide S3 path when saving to create the table dynamically as an external table
        if not table_exists(spark_session=spark_session, db_table=target_db_table):
            (
                df.write.option("compression", write_compression)
                .option("path", target_s3_path)
                .saveAsTable(
                    name=target_db_table,
                    mode=write_mode,
                    format=write_format,
                    partitionBy=partition_col_names,
                )
            )
        # Otherwise, dynamically overwrite existing partition value combinations and append new partitions
        else:
            (
                df.select(spark_session.table(target_db_table).columns).write.insertInto(
                    tableName=target_db_table,
                    overwrite=True,
                )
            )
    
  • I don't think ".option("partitionOverwriteMode", "dynamic")" is doing anyway, without specifying the full property name

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions