AWS GLUE: Visual ETL From Kinesis into Snowflake

0

Hi there,

I am trying to establish ETL processing from our existing AWS Kinesis data stream to snowflake. We are currently using KCL which is extremely difficult to maintain as we do not have the means to manage the solution and infrastructure around it.

Glue seems like a promising approach. Using an S3 source to SQL ETL to Snowflake works perfectly. Swapping out the S3 source to our Kinesis stream results in a few issues. After creating the streaming schema for our data stream it still results in the following issue attached below.

It seems I also cannot preview the data/schema from my data stream from the visual ETL. Any assistance would be appreciated.

Traceback (most recent call last):
File "/opt/amazon/lib/python3.7/site-packages/awsglue/dataframereader.py", line 20, in from_catalog return self._glue_context.create_data_frame_from_catalog(db, table_name, redshift_tmp_dir, transformation_ctx, push_down_predicate, additional_options, catalog_id, **kwargs)

File "/opt/amazon/lib/python3.7/site-packages/awsglue/context.py", line 219, in create_data_frame_from_catalog source = StreamingDataSource(self._ssql_ctx.getCatalogSource(db, table_name, redshift_tmp_dir, transformation_ctx,

File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__ return_value = get_return_value(

File "/opt/amazon/lib/python3.7/site-packages/pyspark/sql/utils.py", line 190, in deco return f(*a, **kw)

File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError(

py4j.protocol.Py4JJavaError: An error occurred while calling o90.getCatalogSource. : org.antlr.v4.runtime.misc.ParseCancellationException: line 1:0 no viable alternative at input '<EOF>' at com.amazonaws.services.glue.schema.io.ThrowingErrorListener.syntaxError(ThrowingErrorListener.java:15) at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41) at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:544) at org.antlr.v4.runtime.DefaultErrorStrategy.reportNoViableAlternative(DefaultErrorStrategy.java:310) at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:136) at com.amazonaws.services.glue.schema.io.grammar.HiveSchemaParser.dataType(HiveSchemaParser.java:229) at com.amazonaws.services.glue.schema.io.HiveFormatDeserializer.deserializeDataType(HiveFormatDeserializer.java:52) at com.amazonaws.services.glue.schema.io.HiveFormatDeserializer.deserializeDataTypeFromString(HiveFormatDeserializer.java:63) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.$anonfun$getFieldsFromColumns$1(DataCatalogWrapper.scala:615) at scala.collection.immutable.List.map(List.scala:297) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getFieldsFromColumns(DataCatalogWrapper.scala:614) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getFieldsFromColumns$(DataCatalogWrapper.scala:614) at com.amazonaws.services.glue.util.DataCatalogWrapper.getFieldsFromColumns(DataCatalogWrapper.scala:238) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getSchema(DataCatalogWrapper.scala:652) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getSchema$(DataCatalogWrapper.scala:648) at com.amazonaws.services.glue.util.DataCatalogWrapper.getSchema(DataCatalogWrapper.scala:238) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getSchema(DataCatalogWrapper.scala:635) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getSchema$(DataCatalogWrapper.scala:631) at com.amazonaws.services.glue.util.DataCatalogWrapper.getSchema(DataCatalogWrapper.scala:238) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.catalogTableFromGlueTable(DataCatalogWrapper.scala:1065) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.catalogTableFromGlueTable$(DataCatalogWrapper.scala:1023) at com.amazonaws.services.glue.util.DataCatalogWrapper.catalogTableFromGlueTable(DataCatalogWrapper.scala:238) at com.amazonaws.services.glue.util.DataCatalogWrapper.$anonfun$getTable$1(DataCatalogWrapper.scala:288) at scala.util.Try$.apply(Try.scala:213) at com.amazonaws.services.glue.util.DataCatalogWrapper.getTable(DataCatalogWrapper.scala:242) at com.amazonaws.services.glue.GlueContext.getCatalogSource(GlueContext.scala:273) at com.amazonaws.services.glue.GlueContext.getCatalogSource(GlueContext.scala:254) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750)

  • You seem to to be using a catalog table but you also say are creating your "streaming schema"? It would help more details about how you are creating the DataFrame. I would say there is something wrong with that table schema

asked 2 months ago492 views
1 Answer
0

Hello Prashaan,

To answer your question, we require details that are non-public information. I kindly request you to open a support case with AWS using the following link.

https://console.aws.amazon.com/support/home#/case/create

profile pictureAWS
SUPPORT ENGINEER
AkashD
answered 2 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions