Aws Glue Job PySpark - Bookmarks not working as expected. I have everything enabled with Job.Init and Job.Commit along with my DataFrames using transformation_ctx property. Also the primary key on the tables are sequential.
I expect on the first run to insert customer "corey". On the next run it should not include customer "corey". Instead it does and fails the jdbc from options write.
Error: "An error occurred while calling o122.pyWriteDynamicFrame. ERROR: duplicate key value violates unique constraint "customer_customerid_key"" --it should not be inserted the same customer here...why?!?!
Here is the postgres table for customer
CREATE TABLE poc.customer (
customerid int4 NOT NULL,
firstname varchar NOT NULL,
lastname varchar NULL,
myid serial PRIMARY KEY,
CONSTRAINT customer_customerid_key UNIQUE (customerid)
);
SCRIPT
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
dynamoDbConnection = glueContext.create_dynamic_frame.from_catalog(
database="corey-reporting-db",
table_name="cust-corey_customer",
transformation_ctx="dynamoDbConnection_node1",
)
#dynamoDbConnection.printSchema()
#dynamoDbConnection.show()
relationalized_json = dynamoDbConnection.relationalize(root_table_name='customer', staging_path='s3://*********/data/ddb/corey-customer/job-output', transformation_ctx = "relationalized_json")
#relationalized_json.keys()
customer = relationalized_json.select('customer')
#customer.printSchema()
#customer.show()
customerAddress = relationalized_json.select('customer_addresses')
#customerAddress.printSchema()
#customerAddress.show()
customerPhoneNumbers = relationalized_json.select('customer_phonenumbers')
#customerPhoneNumbers.printSchema()
#customerPhoneNumbers.show()
customerMapping = ApplyMapping.apply(frame = customer, mappings = [
("customerId", "string", "customerId", "int"),
("firstname", "string", "firstname", "string"),
("lastname", "string", "lastname", "string")
], transformation_ctx = "customerMapping")
#customerMapping.printSchema()
#customerMapping.show()
customerAddressMapping = ApplyMapping.apply(frame = customerAddress, mappings = [
("id", "long", "customerId", "int"),
("`addresses.val.zip`", "string", "zip", "string"),
("`addresses.val.state`", "string", "state", "string"),
("`addresses.val.type`", "string", "type", "string"),
("`addresses.val.street`", "string", "street", "string")
], transformation_ctx = "customerAddressMapping")
#customerAddressMapping.printSchema()
#customerAddressMapping.show()
customerPhoneMapping = ApplyMapping.apply(frame = customerPhoneNumbers, mappings = [
("id", "long", "customerId", "int"),
("`phonenumbers.val.type`", "string", "type", "string"),
("`phonenumbers.val.value`", "string", "value", "string")
], transformation_ctx = "customerPhoneMapping")
#customerPhoneMapping.printSchema()
#customerPhoneMapping.show()
customerSink = glueContext.write_dynamic_frame.from_options(
frame = customerMapping,
connection_type = 'postgresql',
connection_options={
"url": "jdbc:postgresql://********.us-east-1.rds.amazonaws.com:5432/corey_reporting",
"dbtable": "poc.customer",
"user": "postgres",
"password": "********"
},
transformation_ctx = "customerSink"
)
customerAddressSink = glueContext.write_dynamic_frame.from_options(
frame = customerAddressMapping,
connection_type = 'postgresql',
connection_options={
"url": "jdbc:postgresql://*******.us-east-1.rds.amazonaws.com:5432/corey_reporting",
"dbtable": "poc.address",
"user": "postgres",
"password": "****"
},
transformation_ctx = "customerAddressSink"
)
customerPhoneSink = glueContext.write_dynamic_frame.from_options(
frame = customerPhoneMapping,
connection_type = 'postgresql',
connection_options={
"url": "jdbc:postgresql://*********.us-east-1.rds.amazonaws.com:5432/corey_reporting",
"dbtable": "poc.phonenumber",
"user": "postgres",
"password": "****"
},
transformation_ctx = "customerPhoneSink"
)
job.commit()