By using AWS re:Post, you agree to the Terms of Use

Questions tagged with Extract Transform & Load Data

Sort by most recent

Browse through the questions and answers listed below or filter and sort to narrow down your results.

How to use Glue job bookmark to read mongodb data and track last processed row using id column

I have implemented aws glue job bookmark to read data from MongoDB and write to the s3 bucket, but when we run the script, every time it writes all data in a separate file : below are my code: import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job import time import logging import urllib from pymongo import MongoClient import sys import nad_config from datetime import date ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) #Production DB mongo_uri = "mongodb://ip_1_2_3_4.ap-south-1.compute.internal:27017/test?replicaSet=repomongo" list = ['glue_bookmark'] today = date.today() folder_name = today.strftime("%d-%m-%Y") for i in list: org_id = i[12:18] read_mongo_options = 'read_mongo_options_'+org_id collection_name = i dynamic_frame = 'dynamic_frame'+org_id read_mongo_options = { "uri": mongo_uri, "database": "test", "collection": "test", "username": "test", "password": "test", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "id"} sub_folder_name = org_id; final_folder_path = folder_name+'/test/' datasource0 = glueContext.create_dynamic_frame_from_catalog(database = catalogDB, table_name = catalogTable,connection_type="mongodb",connection_options=read_mongo_options, transformation_ctx = "datasource0",additional_options = {"jobBookmarkKeys":["id"],"jobBookmarkKeysSortOrder":"asc"}) datasink1 = glueContext.write_dynamic_frame.from_options(frame = datasource0,connection_type = "s3",connection_options = {"path": "s3://aws-glue-assets-123456-ap-south-1/"+final_folder_path},format = "json", transformation_ctx = "datasink1") job.commit()
1
answers
0
votes
59
views
asked 2 months ago

[Pandas] How to write data into JSON column of Postgres SQL

Hi, I'm trying to write a dataframe into Postgres SQL table that has JSON column ("details"), using the following code ``` results = [] details_string = '{"name": "test"}' json_object = json.loads(details_string) results.append([1, json_object]) mySchema = StructType([ \ StructField("event_id",IntegerType(), True), \ StructField("details", StructType([StructField('name', StringType(), True)]), True) \ myResult = glueContext.createDataFrame(data = pd.DataFrame(results, columns=['event_id', 'details']), schema=mySchema)]) ... then write to DB ``` However, there seems the issue with the mySchema field for JSON type. I've tried StructType, MapType, ArrayType, but each time I get different errors this is for MapType > Job aborted due to stage failure: Task 4 in stage 182.0 failed 4 times, most recent failure: Lost task 4.3 in stage 182.0 (TID 1802, 172.36.213.211, executor 2): java.lang.IllegalArgumentException: Can't get JDBC type for map<string,string> and this one for StructField("details", StructType([StructField('name', StringType(), True)]), True) > Job aborted due to stage failure: Task 3 in stage 211.0 failed 4 times, most recent failure: Lost task 3.3 in stage 211.0 (TID 2160, 172.36.18.91, executor 4): java.lang.IllegalArgumentException: Can't get JDBC type for struct<name:string> Does anyone have an example how to construct the schema for Dataframe to write the JSON into JSON Postgres SQL column?
1
answers
0
votes
39
views
asked 2 months ago