如何从 PySpark 将数据写入 Kinesis 流?

0

【以下的问题经过翻译处理】 客户希望使用 PySpark 在 Spark Streaming 中处理流数据,并希望将结果输出到 Kinesis 流

虽然 PySpark 确实支持从 Kinesis (http://spark.apache.org/docs/latest/streaming-kinesis-integration.html) 读取数据,但我看不到对将数据写入 Kinesis 的任何支持。

DataBricks 有一些用于为 Spark (scala) 创建 Kinesis sink的文档 (https://docs.databricks.com/spark/latest/structured-streaming/kinesis.html),但如果我正确理解文档,这是基于关于 “ForeachSink”,这在PySpark 是不支持的(http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

The foreach operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface ForeachWriter (Scala/Java docs), which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.

有没有人遇到过实现这一目标的方法?

profile picture
專家
已提問 6 個月前檢視次數 14 次
1 個回答
0

【以下的回答经过翻译处理】 在 PySpark 中,您可以使用 forEachPartition 并为该分区调用 Kinesis 或任何外部 API,或者您也可以使用 map 并按记录调用 Kinesis。

# 方法 1:每个分区

def pushToKinesis(迭代器):
   打印(列表(迭代器)[0]
   #push to kinesis 使用 boto3 APIs

rdd.foreachPartition(pushToKinesis())

# 方法二:每条记录

def pushToKinesis(记录):
   #push to kinesis 使用 boto3 APIs

rdd.map(lambda l: pushToKinesis(l))

下面的博文使用方法 2 调用 Amazon Comprehend: https://aws.amazon.com/blogs/machine-learning/how-to-scale-sentiment-analysis-using-amazon-comprehend-aws-glue-and-amazon-athena/

profile picture
專家
已回答 6 個月前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南