如何从 PySpark 将数据写入 Kinesis 流?

0

【以下的问题经过翻译处理】 客户希望使用 PySpark 在 Spark Streaming 中处理流数据,并希望将结果输出到 Kinesis 流

虽然 PySpark 确实支持从 Kinesis (http://spark.apache.org/docs/latest/streaming-kinesis-integration.html) 读取数据,但我看不到对将数据写入 Kinesis 的任何支持。

DataBricks 有一些用于为 Spark (scala) 创建 Kinesis sink的文档 (https://docs.databricks.com/spark/latest/structured-streaming/kinesis.html),但如果我正确理解文档,这是基于关于 “ForeachSink”,这在PySpark 是不支持的(http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

The foreach operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface ForeachWriter (Scala/Java docs), which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.

有没有人遇到过实现这一目标的方法?

profile picture
EXPERTE
gefragt vor 6 Monaten14 Aufrufe
1 Antwort
0

【以下的回答经过翻译处理】 在 PySpark 中,您可以使用 forEachPartition 并为该分区调用 Kinesis 或任何外部 API,或者您也可以使用 map 并按记录调用 Kinesis。

# 方法 1:每个分区

def pushToKinesis(迭代器):
   打印(列表(迭代器)[0]
   #push to kinesis 使用 boto3 APIs

rdd.foreachPartition(pushToKinesis())

# 方法二:每条记录

def pushToKinesis(记录):
   #push to kinesis 使用 boto3 APIs

rdd.map(lambda l: pushToKinesis(l))

下面的博文使用方法 2 调用 Amazon Comprehend: https://aws.amazon.com/blogs/machine-learning/how-to-scale-sentiment-analysis-using-amazon-comprehend-aws-glue-and-amazon-athena/

profile picture
EXPERTE
beantwortet vor 6 Monaten

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen