Writing data to Kinesis stream from PySpark


A customer would like to process streaming data in Spark Streaming using PySpark and would like to output the results on a Kinesis stream.

While PySpark does support reading data from Kinesis (http://spark.apache.org/docs/latest/streaming-kinesis-integration.html), I do not see any support for writing data to Kinesis.

DataBricks has some documentation for creating a Kinesis sink for Spark (scala) (https://docs.databricks.com/spark/latest/structured-streaming/kinesis.html), but if I understand the documentation correctly this is based on the ForeachSink which is not supported in PySpark (http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach)

The foreach operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface ForeachWriter (Scala/Java docs), which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.

Has anybody come across a way to achieve this?

asked 6 years ago2086 views
1 Answer
Accepted Answer

In PySpark you can use forEachPartition and call Kinesis or any external API for that partition, or you can also use map and call out to Kinesis per record.

# Approach 1: Per Partition

def pushToKinesis(iterator):
   #push to kinesis using boto3 APIs


# Approach 2:  Per record

def pushToKinesis(record):
   #push to kinesis using boto3 APIs

rdd.map(lambda l: pushToKinesis(l)) 

The blog post below uses Approach 2 to call Amazon Comprehend:


answered 6 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions