如何从 PySpark 将数据写入 Kinesis 流?

0

【以下的问题经过翻译处理】 客户希望使用 PySpark 在 Spark Streaming 中处理流数据,并希望将结果输出到 Kinesis 流

虽然 PySpark 确实支持从 Kinesis (http://spark.apache.org/docs/latest/streaming-kinesis-integration.html) 读取数据,但我看不到对将数据写入 Kinesis 的任何支持。

DataBricks 有一些用于为 Spark (scala) 创建 Kinesis sink的文档 (https://docs.databricks.com/spark/latest/structured-streaming/kinesis.html),但如果我正确理解文档,这是基于关于 “ForeachSink”,这在PySpark 是不支持的(http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

The foreach operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface ForeachWriter (Scala/Java docs), which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.

有没有人遇到过实现这一目标的方法?

profile picture
专家
已提问 5 个月前8 查看次数
1 回答
0

【以下的回答经过翻译处理】 在 PySpark 中,您可以使用 forEachPartition 并为该分区调用 Kinesis 或任何外部 API,或者您也可以使用 map 并按记录调用 Kinesis。

# 方法 1:每个分区

def pushToKinesis(迭代器):
   打印(列表(迭代器)[0]
   #push to kinesis 使用 boto3 APIs

rdd.foreachPartition(pushToKinesis())

# 方法二:每条记录

def pushToKinesis(记录):
   #push to kinesis 使用 boto3 APIs

rdd.map(lambda l: pushToKinesis(l))

下面的博文使用方法 2 调用 Amazon Comprehend: https://aws.amazon.com/blogs/machine-learning/how-to-scale-sentiment-analysis-using-amazon-comprehend-aws-glue-and-amazon-athena/

profile picture
专家
已回答 5 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则