By using AWS re:Post, you agree to the Terms of Use

Questions tagged with AWS Glue

Sort by most recent

Browse through the questions and answers listed below or filter and sort to narrow down your results.

Aws Glue Job PySpark - Bookmarks not working as expected

Aws Glue Job PySpark - Bookmarks not working as expected. I have everything enabled with Job.Init and Job.Commit along with my DataFrames using transformation_ctx property. Also the primary key on the tables are sequential. I expect on the first run to insert customer "corey". On the next run it should not include customer "corey". Instead it does and fails the jdbc from options write. Error: "An error occurred while calling o122.pyWriteDynamicFrame. ERROR: duplicate key value violates unique constraint "customer_customerid_key"" --it should not be inserted the same customer here...why?!?! Here is the postgres table for customer ``` CREATE TABLE poc.customer ( customerid int4 NOT NULL, firstname varchar NOT NULL, lastname varchar NULL, myid serial PRIMARY KEY, CONSTRAINT customer_customerid_key UNIQUE (customerid) ); ``` SCRIPT ``` import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql import DataFrame from awsglue.dynamicframe import DynamicFrame args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) dynamoDbConnection = glueContext.create_dynamic_frame.from_catalog( database="corey-reporting-db", table_name="cust-corey_customer", transformation_ctx="dynamoDbConnection_node1", ) #dynamoDbConnection.printSchema() relationalized_json = dynamoDbConnection.relationalize(root_table_name='customer', staging_path='s3://*********/data/ddb/corey-customer/job-output', transformation_ctx = "relationalized_json") #relationalized_json.keys() customer ='customer') #customer.printSchema() customerAddress ='customer_addresses') #customerAddress.printSchema() customerPhoneNumbers ='customer_phonenumbers') #customerPhoneNumbers.printSchema() customerMapping = ApplyMapping.apply(frame = customer, mappings = [ ("customerId", "string", "customerId", "int"), ("firstname", "string", "firstname", "string"), ("lastname", "string", "lastname", "string") ], transformation_ctx = "customerMapping") #customerMapping.printSchema() customerAddressMapping = ApplyMapping.apply(frame = customerAddress, mappings = [ ("id", "long", "customerId", "int"), ("``", "string", "zip", "string"), ("`addresses.val.state`", "string", "state", "string"), ("`addresses.val.type`", "string", "type", "string"), ("`addresses.val.street`", "string", "street", "string") ], transformation_ctx = "customerAddressMapping") #customerAddressMapping.printSchema() customerPhoneMapping = ApplyMapping.apply(frame = customerPhoneNumbers, mappings = [ ("id", "long", "customerId", "int"), ("`phonenumbers.val.type`", "string", "type", "string"), ("`phonenumbers.val.value`", "string", "value", "string") ], transformation_ctx = "customerPhoneMapping") #customerPhoneMapping.printSchema() customerSink = glueContext.write_dynamic_frame.from_options( frame = customerMapping, connection_type = 'postgresql', connection_options={ "url": "jdbc:postgresql://********", "dbtable": "poc.customer", "user": "postgres", "password": "********" }, transformation_ctx = "customerSink" ) customerAddressSink = glueContext.write_dynamic_frame.from_options( frame = customerAddressMapping, connection_type = 'postgresql', connection_options={ "url": "jdbc:postgresql://*******", "dbtable": "poc.address", "user": "postgres", "password": "****" }, transformation_ctx = "customerAddressSink" ) customerPhoneSink = glueContext.write_dynamic_frame.from_options( frame = customerPhoneMapping, connection_type = 'postgresql', connection_options={ "url": "jdbc:postgresql://*********", "dbtable": "poc.phonenumber", "user": "postgres", "password": "****" }, transformation_ctx = "customerPhoneSink" ) job.commit() ```
asked 16 days ago