Large amount of time spent in "com.amazonaws.services.glue.DynamicFrame.recomputeSchema" in Spark/AWS Glue

0

Dear GlueCommunity,

I'm using PySpark with AWS Glue for a large ETL job. Most of my source data sit in an AWS RDS Postgres instance. I read all my tables directly with create_data_frame.from_catalog because the logic is quite complex and implemented in pure PySpark, I don't use Dynamic Frames at all except when writing the final data back to S3.

The first thing that puzzled me in the AWS Glue documentation was this part :

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create-dataframe-from-catalog

create_data_frame_from_catalog

create_data_frame_from_catalog(database, table_name, transformation_ctx = "", additional_options = {})

Returns a DataFrame that is created using information from a Data Catalog table. Use this function only with AWS Glue streaming sources.

Can someone explain to me why we are not supposed to use create_data_frame_from_catalog with non streaming-sources ? In my case none of my sources are streamed, so would it change anything to do instead create_dynamic_frame_from_catalog().toDF() ?

However, my main problem is that one of my source data sits in S3 and is quite large (for my standards, 1To as gzip CSV). I configured the crawler and added the table into my glue database, the schema consists of only 3 columns. Everything seems in order for this table in my glue database.

However, when I try & create a Data Frame for this table with create_data_frame.from_catalog, I get an additional Spark Stage which mostly consists of :

fromRDD at DynamicFrame.scala:320
org.apache.spark.sql.glue.util.SchemaUtils$.fromRDD(SchemaUtils.scala:74)
com.amazonaws.services.glue.DynamicFrame.recomputeSchema(DynamicFrame.scala:320)
com.amazonaws.services.glue.DynamicFrame.schema(DynamicFrame.scala:296)
com.amazonaws.services.glue.DynamicFrame.toDF(DynamicFrame.scala:385)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:282)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Thread.java:748)

It's unclear to my why Recomputing the Schema would be necessary as the table with the schema is already incorported into my glue database and there is no reason to assume it has changed. At the moment with 5 G2.X workers, this step takes around 12 hours.

After reading about the AWS Glue streaming sources and create_data_frame.from_catalog, I tried loading the dataframe with create_dynamic_frame_from_catalog().toDF() but this step of schema recomputing occured nevertheless.

Does someone have any idea why recomputing the schema is necessary at that step? Could I force non recomputation in some way or another?

A more general question would be, is there any other way to have this To of data accessed more efficiently than in S3 ? I don't want to put that in an AWS RDS instance because these data would be very infrequently accessed, should I be looking at something else, DynamoDB, etc ?

Thanks a lot for your help,

Sacha L
asked 2 years ago67 views
No Answers

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions