By using AWS re:Post, you agree to the Terms of Use
/Extract Transform & Load Data/

Questions tagged with Extract Transform & Load Data

Sort by most recent
  • 1
  • 90 / page

Browse through the questions and answers listed below or filter and sort to narrow down your results.

inject non quoted csv file into RDS via glue

I have a pyspark script generated by my glue job that aims to read data from a CSV file in an S3 bucket and write it on my SQL RDS table. in my CSV file, I have string multi-lines. if the strings is quoted the job pass, but in my case, multi-line strings are not quoted so the job cannot insert data in my table; I tried : `spark.read.option("multiLine", "true").option("quoteChar", -1).option("header","true")` it doesn't work. I also tried : ``` datasink5 = glueContext.write_dynamic_frame.from_options( frame = dynamic_frame_write, connection_type = "s3", connection_options = { "path": "s3://mycsvFile" }, format = "csv", format_options={ "quoteChar": -1, "separator": "," }, transformation_ctx = "datasink5") ``` but this wrote the data back to s3 not to my RDS table. this is my glue job : ``` import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from awsglue.job import Job import pyspark.sql.functions as f ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session ## spark.read.option("multiLine", "true").option("quoteChar", -1).option("header","true").option("escape","\'") job = Job(glueContext) job.init(args['JOB_NAME'], args) def otherTreatment(dfa): ... return dfa datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db_rds", table_name = "tbl_csv_extract", transformation_ctx = "datasource0") applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "string", "id", "string"), ("created", "string", "created", "timestamp"), ("name", "string", "name", "string"), ("high", "string", "high", "decimal(22,7)")], transformation_ctx = "applymapping1") selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["created", "name", "high", "id"], transformation_ctx = "selectfields2") resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "db_rds_sql", table_name = "tbl_teststring", transformation_ctx = "resolvechoice3") resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4") data_frame = resolvechoice4.toDF() data_frame = otherTreatment(data_frame) dynamic_frame_write = DynamicFrame.fromDF(data_frame, glueContext, "dynamic_frame_write") datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = dynamic_frame_write, database = "db_rds_sql", table_name = "tbl_teststring", transformation_ctx = "datasink5") ## with the flowing script write output back to s3 not in my sql table datasink5 = glueContext.write_dynamic_frame.from_options( frame = dynamic_frame_write, connection_type = "s3", connection_options = { "path": "s3://mycsvFile" }, format = "csv", format_options={ "quoteChar": -1, "separator": "," }, transformation_ctx = "datasink5") job.commit() ``` does anyone have any idea how can I write My CSV file with non quoted multiline with glue pyspark?
1
answers
0
votes
5
views
AWS-User-5483303
asked a month ago

XML interpret one struct as an array

I've been trying this for a week but I'm starting to give up - I need some help understanding this. I have an S3 bucket full of XML files, and I am creating a pyspark ETL job to convert them to Parquet so I can query them in Athena. Within each XML file, there is an XML tag called ORDER_LINE. This tag is supposed to be an array of items, however in many files, there is only one item. XML does not have the concept of arrays, so when I pass this into my ETL job, Glue interprets the field as a Choice type in the schema, where it could either be an array or a struct type. I need to coerce this into an array type at all times. Here's a list of everything I've tried: 1. Using ResolveChoice to cast to an array. This doesn't work because a struct can't be casted to an array 2. Doing ResolveChoice to "make_struct", then the Map.apply() step to map the field where if "struct" has data, transform it to [struct]. This doesn't work and the Map docs hint that it does not support the python `map` function for arrays. 3. Converting the dynamic frame to a data frame, and then using pyspark withColumn(when(struct.isNotNull, [struct]).otherwise(array)) functions to convert the struct to an array, or make the array the main object, depending on which one is not null. This doesn't work because Glue is inferring the schema in the structs, and the fields in the structs are in a different order, so while all the fields in the schema are the same, Spark can't combine the result because the schema is not exactly the same. 4. Converting to data frame, then using a pyspark UDF to transform the data. This worked on a small dev sample set, but failed on the production dataset. The error message was extremely cryptic and I wasn't able to find the cause. Maybe this could work but I wasn't able to fully understand how to operate on the data in pyspark. 5. Trying to use the "withSchema" format_option when creating the dynamic frame from XML. The intention is to define the schema beforehand, but running this gives an error: ``` com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.LinkedHashMap<java.lang.Object,java.lang.Object>` out of VALUE_TRUE token at [Source: (String)" [...] (through reference chain: com.amazonaws.services.glue.schema.types.StructType["fields"]->java.util.ArrayList[0]->com.amazonaws.services.glue.schema.types.Field["properties"]) ``` So my question is, how do I make the XML data source for Glue interpret a tag as always an array, instead of a Choice, or how do I combine them? Even StackOverflow failed me here, and the forum post https://forums.aws.amazon.com/thread.jspa?messageID=931586&tstart=0 went unanswered. Here's a snippet of my pyspark code: ``` import sys import json from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.gluetypes import ( StructType, Field, StringType, IntegerType, ArrayType, ) from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions( sys.argv, [ "JOB_NAME", "source_bucket_name", "target_bucket_name", ], ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) source_bucket_name = args["source_bucket_name"] target_bucket_name = args["target_bucket_name"] schema = StructType( [ [fields removed as they are sensitive] Field( "ORDER_LINE", ArrayType( StructType( [ Field("FIELD1", IntegerType(), True), Field( "FIELD2", StructType([Field("CODE", StringType(), True)]), True, ), Field( "FIELD#", StructType([Field("CODE", StringType(), True)]), True, ), [fields removed] ] ) ), True, ), ] ) datasource0 = glueContext.create_dynamic_frame.from_options( "s3", {"paths": [f"s3://{source_bucket_name}"]}, format="xml", format_options={ "rowTag": "ORDER", "withSchema": json.dumps(schema.jsonValue()), }, transformation_ctx="datasource0", ) [more steps after this] ```
3
answers
0
votes
5
views
Eelviny
asked a month ago

Data Pipeline stops processing files in S3 bucket

I have a Data Pipeline which reads CSV files from an S3 bucket and copies the data into an RDS database. I specify the bucket/folder name and it goes through each CSV file in the bucket/folder and processes it. When it is done, a ShellCommandActivity moves the files to another 'folder' in the S3 bucket. That's how it works in testing. With the real data it just stops after a few files. The last line in the logs is `07 Dec 2021 09:57:55,755 [INFO] (TaskRunnerService-resource:df-1234xxx1_@Ec2Instance_2021-12-07T09:53:00-0) df-1234xxx1 amazonaws.datapipeline.connector.s3.RetryableS3Reader: Reopening connection and advancing 0` The logs show that it usually downloads the CSV file, then it writes the 'Reopening connection and advancing 0' line, then it deletes a temp file, then goes onto the the next file. But on the seventh file it just stops on 'Reopening connection and advancing 0'. It isn't the next file that is the problem, as it will process fine on it's own. I've already tried making the files smaller - originally it was stopping on the second file but now the filesizes are about 1.7MB it's getting through six of them before it stops. The status of each task (both DataLoadActivity and ShellCommandActivity) show 'CANCELLED' after one attempt (3 attempts are allowed) and there is no error message. I'm guessing this is some sort of timeout. How can I make the pipeline reliable so that it will process all of the files?
2
answers
0
votes
8
views
erc_aws
asked a month ago
  • 1
  • 90 / page