Aws Glue Job PySpark - Bookmarks not working as expected


Aws Glue Job PySpark - Bookmarks not working as expected. I have everything enabled with Job.Init and Job.Commit along with my DataFrames using transformation_ctx property. Also the primary key on the tables are sequential.

I expect on the first run to insert customer "corey". On the next run it should not include customer "corey". Instead it does and fails the jdbc from options write.

Error: "An error occurred while calling o122.pyWriteDynamicFrame. ERROR: duplicate key value violates unique constraint "customer_customerid_key"" --it should not be inserted the same customer here...why?!?!

Here is the postgres table for customer

CREATE TABLE poc.customer (
	customerid int4 NOT NULL,
	firstname varchar NOT NULL,
	lastname varchar NULL,
	myid serial PRIMARY KEY,
	CONSTRAINT customer_customerid_key UNIQUE (customerid)


import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

dynamoDbConnection = glueContext.create_dynamic_frame.from_catalog(

relationalized_json = dynamoDbConnection.relationalize(root_table_name='customer', staging_path='s3://*********/data/ddb/corey-customer/job-output', transformation_ctx = "relationalized_json")
customer ='customer')

customerAddress ='customer_addresses')

customerPhoneNumbers ='customer_phonenumbers')
customerMapping = ApplyMapping.apply(frame = customer, mappings = [
    ("customerId", "string", "customerId", "int"), 
    ("firstname", "string", "firstname", "string"), 
    ("lastname", "string", "lastname", "string")
], transformation_ctx = "customerMapping")


customerAddressMapping = ApplyMapping.apply(frame = customerAddress, mappings = [
    ("id", "long", "customerId", "int"), 
    ("``", "string", "zip", "string"), 
    ("`addresses.val.state`", "string", "state", "string"), 
    ("`addresses.val.type`", "string", "type", "string"), 
    ("`addresses.val.street`", "string", "street", "string")
], transformation_ctx = "customerAddressMapping")

customerPhoneMapping = ApplyMapping.apply(frame = customerPhoneNumbers, mappings = [
    ("id", "long", "customerId", "int"), 
    ("`phonenumbers.val.type`", "string", "type", "string"), 
    ("`phonenumbers.val.value`", "string", "value", "string")
], transformation_ctx = "customerPhoneMapping")

customerSink = glueContext.write_dynamic_frame.from_options(
frame = customerMapping,
connection_type = 'postgresql',
    "url": "jdbc:postgresql://********",
    "dbtable": "poc.customer",
    "user": "postgres",
    "password": "********"

transformation_ctx = "customerSink"

customerAddressSink = glueContext.write_dynamic_frame.from_options(
frame = customerAddressMapping,
connection_type = 'postgresql',
    "url": "jdbc:postgresql://*******",
    "dbtable": "poc.address",
    "user": "postgres",
    "password": "****"
transformation_ctx = "customerAddressSink"

customerPhoneSink = glueContext.write_dynamic_frame.from_options(
frame = customerPhoneMapping,
connection_type = 'postgresql',
    "url": "jdbc:postgresql://*********",
    "dbtable": "poc.phonenumber",
    "user": "postgres",
    "password": "****"
transformation_ctx = "customerPhoneSink"

1개 답변

Bookmarks are supported only in S3 and in JDBC data sources. Have you considered using DynamoDB Streams + Lambda + Kinesis Data Firehose to continuously feed the data lake in S3? This blog post describes something similar for expired items with a TTL.


profile pictureAWS
답변함 2년 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠