AWS Glue Interactive Notebooks: default df.saveAsTable() behavior overwrites entire table instead of dynamically based on dataframe values

0

In AWS Glue Interactive Notebooks, the default behavior, when saving data via df.saveAsTable(), is to overwrite an entire table regardless of the dataframe values. How can I enable the Spark setting to only overwrite table partitions dynamically based on the dataframe?

Generic example:

  • "table1" has the partition column, event_date, with the values, "2023-01-01" and "2023-01-02"
  • The spark dataframe, df, has only the event_date value, "2023-01-03"
from awsglue.context import GlueContext
from pyspark.context import SparkContext

spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)

spark_session = (
    glue_context.spark_session.builder.appName(session_name)
    .config("spark.logConf", "true")
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .enableHiveSupport()
    .getOrCreate()
)

 (
        df.write.option("compression", "snappy")
        # including the path creates the table if not exists 
        # as external instead of managed
        .option("path", target_s3_path)
        .saveAsTable(
            name=target_db_table,
            mode="overwrite",
            format="parquet",
            partitionBy="event_date",
        )
    )
profile picture
Jaime
已提問 1 年前檢視次數 420 次
1 個回答
0

By default when you do overwrite, since you are not specifying any partitions to update, it will override the whole table.
Changing spark.sql.sources.partitionOverwriteMode to dynamic (I think you can pass that as a write option), should detect which partitions are affected by your data and only override those

profile pictureAWS
專家
已回答 1 年前
  • I tried both of the following methods and get the same issue in a Glue notebook while simultaneously reading from the table in Athena for testing purposes. The table definition should not be completely recreated and partitions re-added when "overwriting" a new partition. In my test, the data is only a new partition value.

    The biggest issue with this behavior is that, when a job is long-running, fails when writing data, or is killed, the table may not be able to be read until it is manually recreated.

    I also wonder if this type of behavior is because I am using Glue as the data catalog.

    Table <table name> does not exist
    # then table exists with no data showing up
    # then previous and new data shows up
    
        (
            df.write.option("compression", "snappy")
            .saveAsTable(
                name=target_db_table,
                mode="overwrite",
                format="parquet",
                partitionBy=partition_col_names,
                partitionOverwriteMode="dynamic",
            )
        )
    
        (
            df.write.option("compression", "snappy")
            .option("partitionOverwriteMode", "dynamic")
            .saveAsTable(
                name=target_db_table,
                mode="overwrite",
                format="parquet",
                partitionBy=partition_col_names,
            )
        )
    
  • Update: this is not ideal, but I was able to get new partitions to append dynamically and not drop/recreate the table by using the following:

    def table_exists(
        spark_session: SparkSession,
        db_table: str,
    ) -> bool:
        try:
            spark_session.sql(f"desc {db_table}")
            exists = True
        except AnalysisException:
            exists = False
    
        return exists
    
        # If table does not exist, provide S3 path when saving to create the table dynamically as an external table
        if not table_exists(spark_session=spark_session, db_table=target_db_table):
            (
                df.write.option("compression", write_compression)
                .option("path", target_s3_path)
                .saveAsTable(
                    name=target_db_table,
                    mode=write_mode,
                    format=write_format,
                    partitionBy=partition_col_names,
                )
            )
        # Otherwise, dynamically overwrite existing partition value combinations and append new partitions
        else:
            (
                df.select(spark_session.table(target_db_table).columns).write.insertInto(
                    tableName=target_db_table,
                    overwrite=True,
                )
            )
    
  • I don't think ".option("partitionOverwriteMode", "dynamic")" is doing anyway, without specifying the full property name

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南