Writing data to Kinesis stream from PySpark

0

A customer would like to process streaming data in Spark Streaming using PySpark and would like to output the results on a Kinesis stream.

While PySpark does support reading data from Kinesis (http://spark.apache.org/docs/latest/streaming-kinesis-integration.html), I do not see any support for writing data to Kinesis.

DataBricks has some documentation for creating a Kinesis sink for Spark (scala) (https://docs.databricks.com/spark/latest/structured-streaming/kinesis.html), but if I understand the documentation correctly this is based on the ForeachSink which is not supported in PySpark (http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach)

The foreach operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface ForeachWriter (Scala/Java docs), which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.

Has anybody come across a way to achieve this?

AWS
专家
已提问 6 年前2286 查看次数
1 回答
0
已接受的回答

In PySpark you can use forEachPartition and call Kinesis or any external API for that partition, or you can also use map and call out to Kinesis per record.

# Approach 1: Per Partition

def pushToKinesis(iterator):
   print(list(iterator)[0]
   #push to kinesis using boto3 APIs

rdd.foreachPartition(pushToKinesis()) 

# Approach 2:  Per record

def pushToKinesis(record):
   #push to kinesis using boto3 APIs

rdd.map(lambda l: pushToKinesis(l)) 

The blog post below uses Approach 2 to call Amazon Comprehend:

https://aws.amazon.com/blogs/machine-learning/how-to-scale-sentiment-analysis-using-amazon-comprehend-aws-glue-and-amazon-athena/

AWS
已回答 6 年前
profile picture
专家
已审核 1 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则