By using AWS re:Post, you agree to the Terms of Use

Questions tagged with Extract Transform & Load Data

Sort by most recent
  • 1
  • 12 / page

Browse through the questions and answers listed below or filter and sort to narrow down your results.

[Pandas] How to write data into JSON column of Postgres SQL

Hi, I'm trying to write a dataframe into Postgres SQL table that has JSON column ("details"), using the following code ``` results = [] details_string = '{"name": "test"}' json_object = json.loads(details_string) results.append([1, json_object]) mySchema = StructType([ \ StructField("event_id",IntegerType(), True), \ StructField("details", StructType([StructField('name', StringType(), True)]), True) \ myResult = glueContext.createDataFrame(data = pd.DataFrame(results, columns=['event_id', 'details']), schema=mySchema)]) ... then write to DB ``` However, there seems the issue with the mySchema field for JSON type. I've tried StructType, MapType, ArrayType, but each time I get different errors this is for MapType > Job aborted due to stage failure: Task 4 in stage 182.0 failed 4 times, most recent failure: Lost task 4.3 in stage 182.0 (TID 1802, 172.36.213.211, executor 2): java.lang.IllegalArgumentException: Can't get JDBC type for map<string,string> and this one for StructField("details", StructType([StructField('name', StringType(), True)]), True) > Job aborted due to stage failure: Task 3 in stage 211.0 failed 4 times, most recent failure: Lost task 3.3 in stage 211.0 (TID 2160, 172.36.18.91, executor 4): java.lang.IllegalArgumentException: Can't get JDBC type for struct<name:string> Does anyone have an example how to construct the schema for Dataframe to write the JSON into JSON Postgres SQL column?
0
answers
0
votes
22
views
asked a day ago

Large amount of time spent in "com.amazonaws.services.glue.DynamicFrame.recomputeSchema" in Spark/AWS Glue

Dear GlueCommunity, I'm using PySpark with AWS Glue for a large ETL job. Most of my source data sit in an AWS RDS Postgres instance. I read all my tables directly with `create_data_frame.from_catalog` because the logic is quite complex and implemented in pure PySpark, I don't use Dynamic Frames at all except when writing the final data back to S3. The first thing that puzzled me in the AWS Glue documentation was this part : https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create-dataframe-from-catalog > >**create_data_frame_from_catalog** > >create_data_frame_from_catalog(database, table_name, transformation_ctx = "", additional_options = {}) > >Returns a DataFrame that is created using information from a Data Catalog table. **Use this function only with AWS Glue streaming sources.** > Can someone explain to me why we are not supposed to use `create_data_frame_from_catalog` with non streaming-sources ? In my case none of my sources are streamed, so would it change anything to do instead `create_dynamic_frame_from_catalog().toDF()` ? However, my main problem is that one of my source data sits in S3 and is quite large (for my standards, 1To as gzip CSV). I configured the crawler and added the table into my glue database, the schema consists of only 3 columns. Everything seems in order for this table in my glue database. However, when I try & create a Data Frame for this table with `create_data_frame.from_catalog`, I get an additional Spark Stage which mostly consists of : ``` fromRDD at DynamicFrame.scala:320 org.apache.spark.sql.glue.util.SchemaUtils$.fromRDD(SchemaUtils.scala:74) com.amazonaws.services.glue.DynamicFrame.recomputeSchema(DynamicFrame.scala:320) com.amazonaws.services.glue.DynamicFrame.schema(DynamicFrame.scala:296) com.amazonaws.services.glue.DynamicFrame.toDF(DynamicFrame.scala:385) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) py4j.Gateway.invoke(Gateway.java:282) py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) py4j.commands.CallCommand.execute(CallCommand.java:79) py4j.GatewayConnection.run(GatewayConnection.java:238) java.lang.Thread.run(Thread.java:748) ``` It's unclear to my why Recomputing the Schema would be necessary as the table with the schema is already incorported into my glue database and there is no reason to assume it has changed. At the moment with 5 G2.X workers, this step takes around 12 hours. After reading about the AWS Glue streaming sources and `create_data_frame.from_catalog`, I tried loading the dataframe with `create_dynamic_frame_from_catalog().toDF()` but this step of schema recomputing occured nevertheless. Does someone have any idea why recomputing the schema is necessary at that step? Could I force non recomputation in some way or another? A more general question would be, is there any other way to have this To of data accessed more efficiently than in S3 ? I don't want to put that in an AWS RDS instance because these data would be very infrequently accessed, should I be looking at something else, DynamoDB, etc ? Thanks a lot for your help,
0
answers
0
votes
21
views
asked 6 days ago
  • 1
  • 12 / page