AWS Glue Interactive Notebooks: default df.saveAsTable() behavior overwrites entire table instead of dynamically based on dataframe values

0

In AWS Glue Interactive Notebooks, the default behavior, when saving data via df.saveAsTable(), is to overwrite an entire table regardless of the dataframe values. How can I enable the Spark setting to only overwrite table partitions dynamically based on the dataframe?

Generic example:

  • "table1" has the partition column, event_date, with the values, "2023-01-01" and "2023-01-02"
  • The spark dataframe, df, has only the event_date value, "2023-01-03"
from awsglue.context import GlueContext
from pyspark.context import SparkContext

spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)

spark_session = (
    glue_context.spark_session.builder.appName(session_name)
    .config("spark.logConf", "true")
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .enableHiveSupport()
    .getOrCreate()
)

 (
        df.write.option("compression", "snappy")
        # including the path creates the table if not exists 
        # as external instead of managed
        .option("path", target_s3_path)
        .saveAsTable(
            name=target_db_table,
            mode="overwrite",
            format="parquet",
            partitionBy="event_date",
        )
    )
profile picture
Jaime
質問済み 1年前422ビュー
1回答
0

By default when you do overwrite, since you are not specifying any partitions to update, it will override the whole table.
Changing spark.sql.sources.partitionOverwriteMode to dynamic (I think you can pass that as a write option), should detect which partitions are affected by your data and only override those

profile pictureAWS
エキスパート
回答済み 1年前
  • I tried both of the following methods and get the same issue in a Glue notebook while simultaneously reading from the table in Athena for testing purposes. The table definition should not be completely recreated and partitions re-added when "overwriting" a new partition. In my test, the data is only a new partition value.

    The biggest issue with this behavior is that, when a job is long-running, fails when writing data, or is killed, the table may not be able to be read until it is manually recreated.

    I also wonder if this type of behavior is because I am using Glue as the data catalog.

    Table <table name> does not exist
    # then table exists with no data showing up
    # then previous and new data shows up
    
        (
            df.write.option("compression", "snappy")
            .saveAsTable(
                name=target_db_table,
                mode="overwrite",
                format="parquet",
                partitionBy=partition_col_names,
                partitionOverwriteMode="dynamic",
            )
        )
    
        (
            df.write.option("compression", "snappy")
            .option("partitionOverwriteMode", "dynamic")
            .saveAsTable(
                name=target_db_table,
                mode="overwrite",
                format="parquet",
                partitionBy=partition_col_names,
            )
        )
    
  • Update: this is not ideal, but I was able to get new partitions to append dynamically and not drop/recreate the table by using the following:

    def table_exists(
        spark_session: SparkSession,
        db_table: str,
    ) -> bool:
        try:
            spark_session.sql(f"desc {db_table}")
            exists = True
        except AnalysisException:
            exists = False
    
        return exists
    
        # If table does not exist, provide S3 path when saving to create the table dynamically as an external table
        if not table_exists(spark_session=spark_session, db_table=target_db_table):
            (
                df.write.option("compression", write_compression)
                .option("path", target_s3_path)
                .saveAsTable(
                    name=target_db_table,
                    mode=write_mode,
                    format=write_format,
                    partitionBy=partition_col_names,
                )
            )
        # Otherwise, dynamically overwrite existing partition value combinations and append new partitions
        else:
            (
                df.select(spark_session.table(target_db_table).columns).write.insertInto(
                    tableName=target_db_table,
                    overwrite=True,
                )
            )
    
  • I don't think ".option("partitionOverwriteMode", "dynamic")" is doing anyway, without specifying the full property name

ログインしていません。 ログイン 回答を投稿する。

優れた回答とは、質問に明確に答え、建設的なフィードバックを提供し、質問者の専門分野におけるスキルの向上を促すものです。

質問に答えるためのガイドライン

関連するコンテンツ