AWS GLUE: Visual ETL From Kinesis into Snowflake

0

Hi there,

I am trying to establish ETL processing from our existing AWS Kinesis data stream to snowflake. We are currently using KCL which is extremely difficult to maintain as we do not have the means to manage the solution and infrastructure around it.

Glue seems like a promising approach. Using an S3 source to SQL ETL to Snowflake works perfectly. Swapping out the S3 source to our Kinesis stream results in a few issues. After creating the streaming schema for our data stream it still results in the following issue attached below.

It seems I also cannot preview the data/schema from my data stream from the visual ETL. Any assistance would be appreciated.

Traceback (most recent call last):
File "/opt/amazon/lib/python3.7/site-packages/awsglue/dataframereader.py", line 20, in from_catalog return self._glue_context.create_data_frame_from_catalog(db, table_name, redshift_tmp_dir, transformation_ctx, push_down_predicate, additional_options, catalog_id, **kwargs)

File "/opt/amazon/lib/python3.7/site-packages/awsglue/context.py", line 219, in create_data_frame_from_catalog source = StreamingDataSource(self._ssql_ctx.getCatalogSource(db, table_name, redshift_tmp_dir, transformation_ctx,

File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__ return_value = get_return_value(

File "/opt/amazon/lib/python3.7/site-packages/pyspark/sql/utils.py", line 190, in deco return f(*a, **kw)

File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError(

py4j.protocol.Py4JJavaError: An error occurred while calling o90.getCatalogSource. : org.antlr.v4.runtime.misc.ParseCancellationException: line 1:0 no viable alternative at input '<EOF>' at com.amazonaws.services.glue.schema.io.ThrowingErrorListener.syntaxError(ThrowingErrorListener.java:15) at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41) at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:544) at org.antlr.v4.runtime.DefaultErrorStrategy.reportNoViableAlternative(DefaultErrorStrategy.java:310) at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:136) at com.amazonaws.services.glue.schema.io.grammar.HiveSchemaParser.dataType(HiveSchemaParser.java:229) at com.amazonaws.services.glue.schema.io.HiveFormatDeserializer.deserializeDataType(HiveFormatDeserializer.java:52) at com.amazonaws.services.glue.schema.io.HiveFormatDeserializer.deserializeDataTypeFromString(HiveFormatDeserializer.java:63) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.$anonfun$getFieldsFromColumns$1(DataCatalogWrapper.scala:615) at scala.collection.immutable.List.map(List.scala:297) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getFieldsFromColumns(DataCatalogWrapper.scala:614) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getFieldsFromColumns$(DataCatalogWrapper.scala:614) at com.amazonaws.services.glue.util.DataCatalogWrapper.getFieldsFromColumns(DataCatalogWrapper.scala:238) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getSchema(DataCatalogWrapper.scala:652) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getSchema$(DataCatalogWrapper.scala:648) at com.amazonaws.services.glue.util.DataCatalogWrapper.getSchema(DataCatalogWrapper.scala:238) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getSchema(DataCatalogWrapper.scala:635) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.getSchema$(DataCatalogWrapper.scala:631) at com.amazonaws.services.glue.util.DataCatalogWrapper.getSchema(DataCatalogWrapper.scala:238) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.catalogTableFromGlueTable(DataCatalogWrapper.scala:1065) at com.amazonaws.services.glue.util.DataCatalogWrapperUtils.catalogTableFromGlueTable$(DataCatalogWrapper.scala:1023) at com.amazonaws.services.glue.util.DataCatalogWrapper.catalogTableFromGlueTable(DataCatalogWrapper.scala:238) at com.amazonaws.services.glue.util.DataCatalogWrapper.$anonfun$getTable$1(DataCatalogWrapper.scala:288) at scala.util.Try$.apply(Try.scala:213) at com.amazonaws.services.glue.util.DataCatalogWrapper.getTable(DataCatalogWrapper.scala:242) at com.amazonaws.services.glue.GlueContext.getCatalogSource(GlueContext.scala:273) at com.amazonaws.services.glue.GlueContext.getCatalogSource(GlueContext.scala:254) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750)

  • You seem to to be using a catalog table but you also say are creating your "streaming schema"? It would help more details about how you are creating the DataFrame. I would say there is something wrong with that table schema

posta 4 mesi fa1308 visualizzazioni
1 Risposta
0

Hello Prashaan,

To answer your question, we require details that are non-public information. I kindly request you to open a support case with AWS using the following link.

https://console.aws.amazon.com/support/home#/case/create

profile pictureAWS
TECNICO DI SUPPORTO
AkashD
con risposta 4 mesi fa

Accesso non effettuato. Accedi per postare una risposta.

Una buona risposta soddisfa chiaramente la domanda, fornisce un feedback costruttivo e incoraggia la crescita professionale del richiedente.

Linee guida per rispondere alle domande