Writing data to Kinesis stream from PySpark

0

A customer would like to process streaming data in Spark Streaming using PySpark and would like to output the results on a Kinesis stream.

While PySpark does support reading data from Kinesis (http://spark.apache.org/docs/latest/streaming-kinesis-integration.html), I do not see any support for writing data to Kinesis.

DataBricks has some documentation for creating a Kinesis sink for Spark (scala) (https://docs.databricks.com/spark/latest/structured-streaming/kinesis.html), but if I understand the documentation correctly this is based on the ForeachSink which is not supported in PySpark (http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach)

The foreach operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface ForeachWriter (Scala/Java docs), which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.

Has anybody come across a way to achieve this?

AWS
專家
已提問 6 年前檢視次數 2195 次
1 個回答
0
已接受的答案

In PySpark you can use forEachPartition and call Kinesis or any external API for that partition, or you can also use map and call out to Kinesis per record.

# Approach 1: Per Partition

def pushToKinesis(iterator):
   print(list(iterator)[0]
   #push to kinesis using boto3 APIs

rdd.foreachPartition(pushToKinesis()) 

# Approach 2:  Per record

def pushToKinesis(record):
   #push to kinesis using boto3 APIs

rdd.map(lambda l: pushToKinesis(l)) 

The blog post below uses Approach 2 to call Amazon Comprehend:

https://aws.amazon.com/blogs/machine-learning/how-to-scale-sentiment-analysis-using-amazon-comprehend-aws-glue-and-amazon-athena/

AWS
已回答 6 年前
profile picture
專家
已審閱 4 天前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南