Delta Lake write from AWS Glue streaming job

0

Looks like attempting to write to a Delta Lake table from a DynamicFrame is not working. The Visual Glue interface generates a script like:

        s3 = glueContext.write_dynamic_frame.from_options(
            frame=df,
            connection_type="s3",
            format="delta",
            connection_options={
                "path": "s3://...",
                "partitionKeys": [],
            },
            format_options={"compression": "snappy"},
            transformation_ctx="s3",
        )

that fails with the following error:

An error occurred while calling o183.pyWriteDynamicFrame. : java.lang.ClassNotFoundException: Failed to load format with name delta.

Also a sample listed here https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-delta-lake.html does not work but for a different reason (https://stackoverflow.com/questions/76262252/creating-a-delta-table-in-s3-with-glue-delta-lake-creates-a-glue-catalog-table)

The only way I found to save data in a DeltaLake table is going back using a DataFrame writer not attempting to register the table in the Glue catalogue:

 additional_options = {
            "path": "s3://....",
            "write.parquet.compression-codec": "snappy",
        }
        df = MyDinamicFrame.toDF()
        df.write.format("delta").options(**additional_options).mode("append").save()

Am I missing something? I am using Glue 4.0 with the default DeltaLake version.

2 Antworten
1
Akzeptierte Antwort

That sounds like a detect, when using the same Delta sink on a regular non streaming job, the code looks like the second one you describe. Will open a ticket to the streaming team.

profile pictureAWS
EXPERTE
beantwortet vor 9 Monaten
  • I've run into a very similar error trying to write to an iceberg table from a Glue streaming job.

0

No, it is correct. In the same Visual Job add a parent data node (Kafka in my case). Once this is in place you will see that the script changes to something like:

s3 = glueContext.write_dynamic_frame.from_options(
            frame=df,
            connection_type="s3",
            format="delta",
            connection_options={
                "path": "s3://...",
                "partitionKeys": [],
            },
            format_options={"compression": "snappy"},
            transformation_ctx="s3",
        )

I made the same test you did and is how discovered that this version works:

S3bucket_node3_df.write.format("delta").options(**additional_options).mode("append").save()

profile picture
beantwortet vor 9 Monaten
  • The streaming is treating delta (as well as hudi and iceberg) like one of the basic formats; anyway, the bug is reported

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen