Questions tagged with Extract Transform & Load Data

Content language: English

Sort by most recent

Browse through the questions and answers listed below or filter and sort to narrow down your results.

Aws Glue Job PySpark - Bookmarks not working as expected

Aws Glue Job PySpark - Bookmarks not working as expected. I have everything enabled with Job.Init and Job.Commit along with my DataFrames using transformation_ctx property. Also the primary key on the tables are sequential. I expect on the first run to insert customer "corey". On the next run it should not include customer "corey". Instead it does and fails the jdbc from options write. Error: "An error occurred while calling o122.pyWriteDynamicFrame. ERROR: duplicate key value violates unique constraint "customer_customerid_key"" --it should not be inserted the same customer here...why?!?! Here is the postgres table for customer ``` CREATE TABLE poc.customer ( customerid int4 NOT NULL, firstname varchar NOT NULL, lastname varchar NULL, myid serial PRIMARY KEY, CONSTRAINT customer_customerid_key UNIQUE (customerid) ); ``` SCRIPT ``` import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql import DataFrame from awsglue.dynamicframe import DynamicFrame args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) dynamoDbConnection = glueContext.create_dynamic_frame.from_catalog( database="corey-reporting-db", table_name="cust-corey_customer", transformation_ctx="dynamoDbConnection_node1", ) #dynamoDbConnection.printSchema() #dynamoDbConnection.show() relationalized_json = dynamoDbConnection.relationalize(root_table_name='customer', staging_path='s3://*********/data/ddb/corey-customer/job-output', transformation_ctx = "relationalized_json") #relationalized_json.keys() customer = relationalized_json.select('customer') #customer.printSchema() #customer.show() customerAddress = relationalized_json.select('customer_addresses') #customerAddress.printSchema() #customerAddress.show() customerPhoneNumbers = relationalized_json.select('customer_phonenumbers') #customerPhoneNumbers.printSchema() #customerPhoneNumbers.show() customerMapping = ApplyMapping.apply(frame = customer, mappings = [ ("customerId", "string", "customerId", "int"), ("firstname", "string", "firstname", "string"), ("lastname", "string", "lastname", "string") ], transformation_ctx = "customerMapping") #customerMapping.printSchema() #customerMapping.show() customerAddressMapping = ApplyMapping.apply(frame = customerAddress, mappings = [ ("id", "long", "customerId", "int"), ("`addresses.val.zip`", "string", "zip", "string"), ("`addresses.val.state`", "string", "state", "string"), ("`addresses.val.type`", "string", "type", "string"), ("`addresses.val.street`", "string", "street", "string") ], transformation_ctx = "customerAddressMapping") #customerAddressMapping.printSchema() #customerAddressMapping.show() customerPhoneMapping = ApplyMapping.apply(frame = customerPhoneNumbers, mappings = [ ("id", "long", "customerId", "int"), ("`phonenumbers.val.type`", "string", "type", "string"), ("`phonenumbers.val.value`", "string", "value", "string") ], transformation_ctx = "customerPhoneMapping") #customerPhoneMapping.printSchema() #customerPhoneMapping.show() customerSink = glueContext.write_dynamic_frame.from_options( frame = customerMapping, connection_type = 'postgresql', connection_options={ "url": "jdbc:postgresql://********.us-east-1.rds.amazonaws.com:5432/corey_reporting", "dbtable": "poc.customer", "user": "postgres", "password": "********" }, transformation_ctx = "customerSink" ) customerAddressSink = glueContext.write_dynamic_frame.from_options( frame = customerAddressMapping, connection_type = 'postgresql', connection_options={ "url": "jdbc:postgresql://*******.us-east-1.rds.amazonaws.com:5432/corey_reporting", "dbtable": "poc.address", "user": "postgres", "password": "****" }, transformation_ctx = "customerAddressSink" ) customerPhoneSink = glueContext.write_dynamic_frame.from_options( frame = customerPhoneMapping, connection_type = 'postgresql', connection_options={ "url": "jdbc:postgresql://*********.us-east-1.rds.amazonaws.com:5432/corey_reporting", "dbtable": "poc.phonenumber", "user": "postgres", "password": "****" }, transformation_ctx = "customerPhoneSink" ) job.commit() ```
1
answers
0
votes
64
views
chdev77
asked 2 months ago

How to reference a glue job python argument in CodeGenConfigurationNodes of create_job() function in boto3 Glue Client.

I would like to reference a python parameter of a glue job within the CodeGenConfigurationNodes of create_job() function in boto3 Glue Client. For instance: I have an argument `--s3_location` which has to be referred within S3CsvSource Node of my ETL job as given below ``` s3_source_node1 = glueContext.create_dynamic_frame.from_options( format_options={ "quoteChar": '"', "withHeader": True, "separator": ",", "optimizePerformance": False, }, connection_type="s3", format="csv", connection_options={"paths": [args["s3_location"]]}, transformation_ctx="s3_source_node1", ) ``` This has to be done via `create_job()` function available as part of GlueClient in boto3. However while defining `CodeGenConfigurationNodes` within `create_job()`, I was not able to mention `[args["s3_location"]` in `path` property of `S3CsvSource` node property. Current CodeGenConfigurationNodes for S3CsvSource is as below: ``` CodeGenConfigurationNodes = { 'node-1':{ 'S3CsvSource': { 'Name': 's3_source', 'Paths': [ 's3://my_bucket/sample_input.csv', ], 'Separator': 'comma', 'QuoteChar': 'quote', 'WithHeader': True, 'WriteHeader': True, }, }] ``` My expected result is something like ``` CodeGenConfigurationNodes = { 'node-1':{ 'S3CsvSource': { 'Name': 's3_source', 'Paths': [ args["s3_location"], ], 'Separator': 'comma', 'QuoteChar': 'quote', 'WithHeader': True, 'WriteHeader': True, }, }] ``` Where `args["s3_location"]` refers to glue job parameter `--s3_location`.
0
answers
0
votes
28
views
asked 3 months ago