Aws Glue Job PySpark - Bookmarks not working as expected

0

Aws Glue Job PySpark - Bookmarks not working as expected. I have everything enabled with Job.Init and Job.Commit along with my DataFrames using transformation_ctx property. Also the primary key on the tables are sequential.

I expect on the first run to insert customer "corey". On the next run it should not include customer "corey". Instead it does and fails the jdbc from options write.

Error: "An error occurred while calling o122.pyWriteDynamicFrame. ERROR: duplicate key value violates unique constraint "customer_customerid_key"" --it should not be inserted the same customer here...why?!?!

Here is the postgres table for customer

CREATE TABLE poc.customer (
	customerid int4 NOT NULL,
	firstname varchar NOT NULL,
	lastname varchar NULL,
	myid serial PRIMARY KEY,
	CONSTRAINT customer_customerid_key UNIQUE (customerid)
);

SCRIPT

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)


dynamoDbConnection = glueContext.create_dynamic_frame.from_catalog(
    database="corey-reporting-db",
    table_name="cust-corey_customer",
    transformation_ctx="dynamoDbConnection_node1",
)

#dynamoDbConnection.printSchema()
#dynamoDbConnection.show()
relationalized_json = dynamoDbConnection.relationalize(root_table_name='customer', staging_path='s3://*********/data/ddb/corey-customer/job-output', transformation_ctx = "relationalized_json")
#relationalized_json.keys()
customer = relationalized_json.select('customer')
#customer.printSchema()
#customer.show()

customerAddress = relationalized_json.select('customer_addresses')
#customerAddress.printSchema()
#customerAddress.show()

customerPhoneNumbers = relationalized_json.select('customer_phonenumbers')
#customerPhoneNumbers.printSchema()
#customerPhoneNumbers.show()
customerMapping = ApplyMapping.apply(frame = customer, mappings = [
    ("customerId", "string", "customerId", "int"), 
    ("firstname", "string", "firstname", "string"), 
    ("lastname", "string", "lastname", "string")
], transformation_ctx = "customerMapping")

#customerMapping.printSchema()
#customerMapping.show()

customerAddressMapping = ApplyMapping.apply(frame = customerAddress, mappings = [
    ("id", "long", "customerId", "int"), 
    ("`addresses.val.zip`", "string", "zip", "string"), 
    ("`addresses.val.state`", "string", "state", "string"), 
    ("`addresses.val.type`", "string", "type", "string"), 
    ("`addresses.val.street`", "string", "street", "string")
], transformation_ctx = "customerAddressMapping")

#customerAddressMapping.printSchema()
#customerAddressMapping.show()
customerPhoneMapping = ApplyMapping.apply(frame = customerPhoneNumbers, mappings = [
    ("id", "long", "customerId", "int"), 
    ("`phonenumbers.val.type`", "string", "type", "string"), 
    ("`phonenumbers.val.value`", "string", "value", "string")
], transformation_ctx = "customerPhoneMapping")

#customerPhoneMapping.printSchema()
#customerPhoneMapping.show()
customerSink = glueContext.write_dynamic_frame.from_options(
frame = customerMapping,
connection_type = 'postgresql',
connection_options={
    "url": "jdbc:postgresql://********.us-east-1.rds.amazonaws.com:5432/corey_reporting",
    "dbtable": "poc.customer",
    "user": "postgres",
    "password": "********"

},
transformation_ctx = "customerSink"
)

customerAddressSink = glueContext.write_dynamic_frame.from_options(
frame = customerAddressMapping,
connection_type = 'postgresql',
connection_options={
    "url": "jdbc:postgresql://*******.us-east-1.rds.amazonaws.com:5432/corey_reporting",
    "dbtable": "poc.address",
    "user": "postgres",
    "password": "****"
},
transformation_ctx = "customerAddressSink"
)

customerPhoneSink = glueContext.write_dynamic_frame.from_options(
frame = customerPhoneMapping,
connection_type = 'postgresql',
connection_options={
    "url": "jdbc:postgresql://*********.us-east-1.rds.amazonaws.com:5432/corey_reporting",
    "dbtable": "poc.phonenumber",
    "user": "postgres",
    "password": "****"
},
transformation_ctx = "customerPhoneSink"
)

job.commit()
1 Antwort
0

Bookmarks are supported only in S3 and in JDBC data sources. Have you considered using DynamoDB Streams + Lambda + Kinesis Data Firehose to continuously feed the data lake in S3? This blog post describes something similar for expired items with a TTL.

Reference: https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

profile pictureAWS
beantwortet vor 2 Jahren

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen