AWS GLUE: Visual ETL From Kinesis into Snowflake

0

Hi there,

I am trying to establish ETL processing from our existing AWS Kinesis data stream to snowflake. We are currently using KCL which is extremely difficult to maintain as we do not have the means to manage the solution and infrastructure around it.

Glue seems like a promising approach. Using an S3 source to SQL ETL to Snowflake works perfectly. Swapping out the S3 source to our Kinesis stream results in a few issues. After creating the streaming schema for our data stream it still results in the following issue attached below.

It seems I also cannot preview the data/schema from my data stream from the visual ETL. Any assistance would be appreciated.

Traceback (most recent call last):
File "/opt/amazon/lib/python3.7/site-packages/awsglue/dataframereader.py", line 20, in from_catalog return self._glue_context.create_data_frame_from_catalog(db, table_name, redshift_tmp_dir, transformation_ctx, push_down_predicate, additional_options, catalog_id, **kwargs)

File "/opt/amazon/lib/python3.7/site-packages/awsglue/context.py", line 219, in create_data_frame_from_catalog source = StreamingDataSource(self._ssql_ctx.getCatalogSource(db, table_name, redshift_tmp_dir, transformation_ctx,

File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__ return_value = get_return_value(

File "/opt/amazon/lib/python3.7/site-packages/pyspark/sql/utils.py", line 190, in deco return f(*a, **kw)

File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError(

py4j.protocol.Py4JJavaError: An error occurred while calling o90.getCatalogSource. : org.antlr.v4.runtime.misc.ParseCancellationException: line 1:0 no viable alternative at input '<EOF>' at com.amazonaws.services.glue.schema.io.ThrowingErrorListener.syntaxError(ThrowingErrorListener.java:15) at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41) at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:544) at org.antlr.v4.runtime.DefaultErrorStrategy.reportNoViableAlternative(DefaultErrorStrategy.java:310) at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:136) at com.amazonaws.services.glue.schema.io.grammar.HiveSchemaParser.dataType(HiveSchemaParser.java:229) at com.amazonaws.services.glue.schema.io.HiveFormatDeserializer.deserializeDataType(HiveFormatDeserializer.java:52) at com.amazonaws.services.glue.schema.io.HiveFormatDeserializer.deserializeDataTypeFromString(HiveFormatDeserializer.java:63) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.$anonfun$getFieldsFromColumns$1(DataCatalogWrapper.scala:615) at scala.collection.immutable.List.map(List.scala:297) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getFieldsFromColumns(DataCatalogWrapper.scala:614) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getFieldsFromColumns$(DataCatalogWrapper.scala:614) at com.amazonaws.services.glue.util.DataCatalogWrapper.getFieldsFromColumns(DataCatalogWrapper.scala:238) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getSchema(DataCatalogWrapper.scala:652) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getSchema$(DataCatalogWrapper.scala:648) at com.amazonaws.services.glue.util.DataCatalogWrapper.getSchema(DataCatalogWrapper.scala:238) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getSchema(DataCatalogWrapper.scala:635) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getSchema$(DataCatalogWrapper.scala:631) at com.amazonaws.services.glue.util.DataCatalogWrapper.getSchema(DataCatalogWrapper.scala:238) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.catalogTableFromGlueTable(DataCatalogWrapper.scala:1065) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.catalogTableFromGlueTable$(DataCatalogWrapper.scala:1023) at com.amazonaws.services.glue.util.DataCatalogWrapper.catalogTableFromGlueTable(DataCatalogWrapper.scala:238) at com.amazonaws.services.glue.util.DataCatalogWrapper.$anonfun$getTable$1(DataCatalogWrapper.scala:288) at scala.util.Try$.apply(Try.scala:213) at com.amazonaws.services.glue.util.DataCatalogWrapper.getTable(DataCatalogWrapper.scala:242) at com.amazonaws.services.glue.GlueContext.getCatalogSource(GlueContext.scala:273) at com.amazonaws.services.glue.GlueContext.getCatalogSource(GlueContext.scala:254) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750)

  • You seem to to be using a catalog table but you also say are creating your "streaming schema"? It would help more details about how you are creating the DataFrame. I would say there is something wrong with that table schema

已提問 4 個月前檢視次數 1322 次
1 個回答
0

Hello Prashaan,

To answer your question, we require details that are non-public information. I kindly request you to open a support case with AWS using the following link.

https://console.aws.amazon.com/support/home#/case/create

profile pictureAWS
支援工程師
AkashD
已回答 4 個月前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南