AWS Glue: ETL Struct field (Dynamo DB) to Aurora Postgres json field

0

Several issues:

  1. AWS Auto-generates code that lists the struct type as an "Object" which fails: ChangeSchema_node1685651062990 = ApplyMapping.apply( frame=AWSGlueDataCatalog_node1685651050820, mappings=[ ("company_id", "string", "company_id", "string"), ("request_model", "string", "request_model", "string"), ("created", "string", "created", "timestamp"), ("response_id", "string", "response_id", "string"), ("usage_total_tokens", "long", "usage_total_tokens", "long"), ("usage_request_tokens", "long", "usage_request_tokens", "long"), ("usage_refinement_tokens", "long", "usage_refinement_tokens", "long"), ("api_key_id", "string", "api_key_id", "string"), ("usage_response_tokens", "long", "usage_response_tokens", "long"), ("user_id", "string", "user_id", "string"), ("response_model", "string", "response_model", "string"), ("response_text", "string", "response_text", "string"), ("id", "string", "id", "string"), ("request_payload", "object", "request_payload", "string"), ("request_id", "string", "request_id", "string"), ("bot_id", "string", "bot_id", "string"), ], transformation_ctx="ChangeSchema_node1685651062990", )

To clear this I have to change "object" to "string"

  1. But when attemptoing to write to the PostgresSQL database it fails because the target field "request_payload" is a json field, so I get the folllowing eeror: "An error occurred while calling o135.pyWriteDynamicFrame. ERROR: column "request_payload" is of type json but expression is of type character varying"

Is there a way to cast the field as json before it's written to PostgreSQL DB?

asked 10 months ago591 views
1 Answer
0

you can modify your AWS Glue script

import pyspark.sql.functions as F
dfc = ChangeSchema_node1685651062990.toDF()  # convert dynamic frame to dataframe
dfc = dfc.withColumn("request_payload", F.to_json("request_payload"))  # convert struct to json string
ChangeSchema_node1685651062990 = DynamicFrame.fromDF(dfc, glueContext, "ChangeSchema_node1685651062990")  # convert back to dynamic frame
profile picture
EXPERT
answered 10 months ago
  • Script:

    AWSGlueDataCatalog_node1685651050820 = glueContext.create_dynamic_frame.from_catalog( database="dynamodb-to-analyticsdb", table_name="prompthistory", transformation_ctx="AWSGlueDataCatalog_node1685651050820", )

    PromptHistory_df = AWSGlueDataCatalog_node1685651050820.toDF() PromptHistory_df = PromptHistory_df.withColumn("request_payload", F.to_json("request_payload")) PromptHistory_df = PromptHistory_df.withColumn("created", F.to_timestamp("created")) AWSGlueDataCatalog_node1685651050820 = DynamicFrame.fromDF(PromptHistory_df, glueContext, "AWSGlueDataCatalog_node1685651050820")

    AWSGlueDataCatalog_node1685651092780 = glueContext.write_dynamic_frame.from_catalog( frame=AWSGlueDataCatalog_node1685651050820, database="ablt-ai-analytics-12-14", table_name="postgres_public_prompthistory_06e0ad580a8f6d2ff0e57e074d377329", transformation_ctx="AWSGlueDataCatalog_node1685651092780", )

    job.commit()

  • This is close to what I need but there is still several issues:

    1. This will actually have to be done one level up on "AWSGlueDataCatalog_node1685651050820" rather than on "ChangeSchema_node1685651062990" because to_json will not accept a string as an input type but will accept a struct type.

    2. so I re-wrote the function but write still fails, checking the schema of the DataFrame and DynamicFrame both list the data type as string. So when I write to Postgres I still get the error: "An error occurred while calling o110.pyWriteDynamicFrame. ERROR: column "request_payload" is of type json but expression is of type character varying"

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions