Questions tagged with Extract Transform & Load Data

Content language: English

Sort by most recent

Browse through the questions and answers listed below or filter and sort to narrow down your results.

Set correct Table level, Include Path and Exclude path.

Hello all, I have a s3 bucket with this following path: s3://a/b/c/products Inside the products folder I have one folder for each version (each version is a database snapshot of the products table, obtained on a weekly basis by a workflow). 1. /version_0 1. _temporary 1. 0_$folder$ 2. part-00000-c5... ...c000.snappy.parquet 2. /version_1 1. _temporary 1. 0_$folder$ 2. part-00000-29... ...c000.snappy.parquet I have created a crawler (Include Path is set to the same path mentioned above -s3://a/b/c/products) with the intention of merging all the versions together into 1 table. The schemas of the different partitions are always the same. The structure of the different partitions is also always the same. I have tried with different Table Levels (4, 5 and 6) in the "Grouping Behaviour for S3 Data" section on the Crawler Settings but it always created multiple tables (one table for each version). The _temporary folder is something automatically generated by the workflow so it seems. I don't know if I have to include this in the exclude path in order for it to work. **What should be the correct Include path, exclude path and table levels in order for me to create only ONE table merging all versions together?** I have checked all your general documentation links about this issue but could you please provide an actual solution for this issue?
1
answers
0
votes
34
views
asked 3 months ago

How to use Glue job bookmark to read mongodb data and track last processed row using id column

I have implemented aws glue job bookmark to read data from MongoDB and write to the s3 bucket, but when we run the script, every time it writes all data in a separate file : below are my code: import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job import time import logging import urllib from pymongo import MongoClient import sys import nad_config from datetime import date ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) #Production DB mongo_uri = "mongodb://ip_1_2_3_4.ap-south-1.compute.internal:27017/test?replicaSet=repomongo" list = ['glue_bookmark'] today = date.today() folder_name = today.strftime("%d-%m-%Y") for i in list: org_id = i[12:18] read_mongo_options = 'read_mongo_options_'+org_id collection_name = i dynamic_frame = 'dynamic_frame'+org_id read_mongo_options = { "uri": mongo_uri, "database": "test", "collection": "test", "username": "test", "password": "test", "partitioner": "MongoSamplePartitioner", "partitionerOptions.partitionSizeMB": "10", "partitionerOptions.partitionKey": "id"} sub_folder_name = org_id; final_folder_path = folder_name+'/test/' datasource0 = glueContext.create_dynamic_frame_from_catalog(database = catalogDB, table_name = catalogTable,connection_type="mongodb",connection_options=read_mongo_options, transformation_ctx = "datasource0",additional_options = {"jobBookmarkKeys":["id"],"jobBookmarkKeysSortOrder":"asc"}) datasink1 = glueContext.write_dynamic_frame.from_options(frame = datasource0,connection_type = "s3",connection_options = {"path": "s3://aws-glue-assets-123456-ap-south-1/"+final_folder_path},format = "json", transformation_ctx = "datasink1") job.commit()
1
answers
0
votes
131
views
asked 3 months ago

[Pandas] How to write data into JSON column of Postgres SQL

Hi, I'm trying to write a dataframe into Postgres SQL table that has JSON column ("details"), using the following code ``` results = [] details_string = '{"name": "test"}' json_object = json.loads(details_string) results.append([1, json_object]) mySchema = StructType([ \ StructField("event_id",IntegerType(), True), \ StructField("details", StructType([StructField('name', StringType(), True)]), True) \ myResult = glueContext.createDataFrame(data = pd.DataFrame(results, columns=['event_id', 'details']), schema=mySchema)]) ... then write to DB ``` However, there seems the issue with the mySchema field for JSON type. I've tried StructType, MapType, ArrayType, but each time I get different errors this is for MapType > Job aborted due to stage failure: Task 4 in stage 182.0 failed 4 times, most recent failure: Lost task 4.3 in stage 182.0 (TID 1802, 172.36.213.211, executor 2): java.lang.IllegalArgumentException: Can't get JDBC type for map<string,string> and this one for StructField("details", StructType([StructField('name', StringType(), True)]), True) > Job aborted due to stage failure: Task 3 in stage 211.0 failed 4 times, most recent failure: Lost task 3.3 in stage 211.0 (TID 2160, 172.36.18.91, executor 4): java.lang.IllegalArgumentException: Can't get JDBC type for struct<name:string> Does anyone have an example how to construct the schema for Dataframe to write the JSON into JSON Postgres SQL column?
1
answers
0
votes
44
views
Michael
asked 4 months ago