By using AWS re:Post, you agree to the Terms of Use

Questions tagged with Extract Transform & Load Data

Sort by most recent

Browse through the questions and answers listed below or filter and sort to narrow down your results.

Aws Glue Job PySpark - Bookmarks not working as expected

Aws Glue Job PySpark - Bookmarks not working as expected. I have everything enabled with Job.Init and Job.Commit along with my DataFrames using transformation_ctx property. Also the primary key on the tables are sequential. I expect on the first run to insert customer "corey". On the next run it should not include customer "corey". Instead it does and fails the jdbc from options write. Error: "An error occurred while calling o122.pyWriteDynamicFrame. ERROR: duplicate key value violates unique constraint "customer_customerid_key"" --it should not be inserted the same customer here...why?!?! Here is the postgres table for customer ``` CREATE TABLE poc.customer ( customerid int4 NOT NULL, firstname varchar NOT NULL, lastname varchar NULL, myid serial PRIMARY KEY, CONSTRAINT customer_customerid_key UNIQUE (customerid) ); ``` SCRIPT ``` import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql import DataFrame from awsglue.dynamicframe import DynamicFrame args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) dynamoDbConnection = glueContext.create_dynamic_frame.from_catalog( database="corey-reporting-db", table_name="cust-corey_customer", transformation_ctx="dynamoDbConnection_node1", ) #dynamoDbConnection.printSchema() #dynamoDbConnection.show() relationalized_json = dynamoDbConnection.relationalize(root_table_name='customer', staging_path='s3://*********/data/ddb/corey-customer/job-output', transformation_ctx = "relationalized_json") #relationalized_json.keys() customer = relationalized_json.select('customer') #customer.printSchema() #customer.show() customerAddress = relationalized_json.select('customer_addresses') #customerAddress.printSchema() #customerAddress.show() customerPhoneNumbers = relationalized_json.select('customer_phonenumbers') #customerPhoneNumbers.printSchema() #customerPhoneNumbers.show() customerMapping = ApplyMapping.apply(frame = customer, mappings = [ ("customerId", "string", "customerId", "int"), ("firstname", "string", "firstname", "string"), ("lastname", "string", "lastname", "string") ], transformation_ctx = "customerMapping") #customerMapping.printSchema() #customerMapping.show() customerAddressMapping = ApplyMapping.apply(frame = customerAddress, mappings = [ ("id", "long", "customerId", "int"), ("`addresses.val.zip`", "string", "zip", "string"), ("`addresses.val.state`", "string", "state", "string"), ("`addresses.val.type`", "string", "type", "string"), ("`addresses.val.street`", "string", "street", "string") ], transformation_ctx = "customerAddressMapping") #customerAddressMapping.printSchema() #customerAddressMapping.show() customerPhoneMapping = ApplyMapping.apply(frame = customerPhoneNumbers, mappings = [ ("id", "long", "customerId", "int"), ("`phonenumbers.val.type`", "string", "type", "string"), ("`phonenumbers.val.value`", "string", "value", "string") ], transformation_ctx = "customerPhoneMapping") #customerPhoneMapping.printSchema() #customerPhoneMapping.show() customerSink = glueContext.write_dynamic_frame.from_options( frame = customerMapping, connection_type = 'postgresql', connection_options={ "url": "jdbc:postgresql://********.us-east-1.rds.amazonaws.com:5432/corey_reporting", "dbtable": "poc.customer", "user": "postgres", "password": "********" }, transformation_ctx = "customerSink" ) customerAddressSink = glueContext.write_dynamic_frame.from_options( frame = customerAddressMapping, connection_type = 'postgresql', connection_options={ "url": "jdbc:postgresql://*******.us-east-1.rds.amazonaws.com:5432/corey_reporting", "dbtable": "poc.address", "user": "postgres", "password": "****" }, transformation_ctx = "customerAddressSink" ) customerPhoneSink = glueContext.write_dynamic_frame.from_options( frame = customerPhoneMapping, connection_type = 'postgresql', connection_options={ "url": "jdbc:postgresql://*********.us-east-1.rds.amazonaws.com:5432/corey_reporting", "dbtable": "poc.phonenumber", "user": "postgres", "password": "****" }, transformation_ctx = "customerPhoneSink" ) job.commit() ```
1
answers
0
votes
30
views
asked 21 days ago