By using AWS re:Post, you agree to the Terms of Use

Aws Glue Job PySpark - Bookmarks not working as expected

0

Aws Glue Job PySpark - Bookmarks not working as expected. I have everything enabled with Job.Init and Job.Commit along with my DataFrames using transformation_ctx property. Also the primary key on the tables are sequential.

I expect on the first run to insert customer "corey". On the next run it should not include customer "corey". Instead it does and fails the jdbc from options write.

Error: "An error occurred while calling o122.pyWriteDynamicFrame. ERROR: duplicate key value violates unique constraint "customer_customerid_key"" --it should not be inserted the same customer here...why?!?!

Here is the postgres table for customer

CREATE TABLE poc.customer (
	customerid int4 NOT NULL,
	firstname varchar NOT NULL,
	lastname varchar NULL,
	myid serial PRIMARY KEY,
	CONSTRAINT customer_customerid_key UNIQUE (customerid)
);

SCRIPT

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)


dynamoDbConnection = glueContext.create_dynamic_frame.from_catalog(
    database="corey-reporting-db",
    table_name="cust-corey_customer",
    transformation_ctx="dynamoDbConnection_node1",
)

#dynamoDbConnection.printSchema()
#dynamoDbConnection.show()
relationalized_json = dynamoDbConnection.relationalize(root_table_name='customer', staging_path='s3://*********/data/ddb/corey-customer/job-output', transformation_ctx = "relationalized_json")
#relationalized_json.keys()
customer = relationalized_json.select('customer')
#customer.printSchema()
#customer.show()

customerAddress = relationalized_json.select('customer_addresses')
#customerAddress.printSchema()
#customerAddress.show()

customerPhoneNumbers = relationalized_json.select('customer_phonenumbers')
#customerPhoneNumbers.printSchema()
#customerPhoneNumbers.show()
customerMapping = ApplyMapping.apply(frame = customer, mappings = [
    ("customerId", "string", "customerId", "int"), 
    ("firstname", "string", "firstname", "string"), 
    ("lastname", "string", "lastname", "string")
], transformation_ctx = "customerMapping")

#customerMapping.printSchema()
#customerMapping.show()

customerAddressMapping = ApplyMapping.apply(frame = customerAddress, mappings = [
    ("id", "long", "customerId", "int"), 
    ("`addresses.val.zip`", "string", "zip", "string"), 
    ("`addresses.val.state`", "string", "state", "string"), 
    ("`addresses.val.type`", "string", "type", "string"), 
    ("`addresses.val.street`", "string", "street", "string")
], transformation_ctx = "customerAddressMapping")

#customerAddressMapping.printSchema()
#customerAddressMapping.show()
customerPhoneMapping = ApplyMapping.apply(frame = customerPhoneNumbers, mappings = [
    ("id", "long", "customerId", "int"), 
    ("`phonenumbers.val.type`", "string", "type", "string"), 
    ("`phonenumbers.val.value`", "string", "value", "string")
], transformation_ctx = "customerPhoneMapping")

#customerPhoneMapping.printSchema()
#customerPhoneMapping.show()
customerSink = glueContext.write_dynamic_frame.from_options(
frame = customerMapping,
connection_type = 'postgresql',
connection_options={
    "url": "jdbc:postgresql://********.us-east-1.rds.amazonaws.com:5432/corey_reporting",
    "dbtable": "poc.customer",
    "user": "postgres",
    "password": "********"

},
transformation_ctx = "customerSink"
)

customerAddressSink = glueContext.write_dynamic_frame.from_options(
frame = customerAddressMapping,
connection_type = 'postgresql',
connection_options={
    "url": "jdbc:postgresql://*******.us-east-1.rds.amazonaws.com:5432/corey_reporting",
    "dbtable": "poc.address",
    "user": "postgres",
    "password": "****"
},
transformation_ctx = "customerAddressSink"
)

customerPhoneSink = glueContext.write_dynamic_frame.from_options(
frame = customerPhoneMapping,
connection_type = 'postgresql',
connection_options={
    "url": "jdbc:postgresql://*********.us-east-1.rds.amazonaws.com:5432/corey_reporting",
    "dbtable": "poc.phonenumber",
    "user": "postgres",
    "password": "****"
},
transformation_ctx = "customerPhoneSink"
)

job.commit()
1 Answer
0

Bookmarks are supported only in S3 and in JDBC data sources. Have you considered using DynamoDB Streams + Lambda + Kinesis Data Firehose to continuously feed the data lake in S3? This blog post describes something similar for expired items with a TTL.

Reference: https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

profile picture
answered 11 days ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions