无法使用Pyspark或Python从MongoDB读取数据。

0

【以下的问题经过翻译处理】 我正在尝试使用PySpark和本地Python在AWS EMR中从3个节点的MongoDB集群(副本集)中读取数据。当在AWS EMR群集中执行代码时,我遇到了问题,如下所述,但相同的代码在我的本地Windows机器上运行良好。

  • Spark版本- 2.4.8
  • Scala版本- 2.11.12
  • MongoDB版本- 4.4.8
  • mongo-spark-connector版本- mongo-spark-connector_2.11:2.4.4
  • Python版本- 3.7.10

通过Pyspark-(问题- p yspark给出空数据框)

在本地和群集模式下运行pyspark作业时,以下是命令。

  1. 本地模式:
  2. 群集模式:

两种模式下,即使从Spark群集中的所有节点执行telnet(从所有节点)都可以正常工作,但我无法从mongoDB中读取数据(空数据框) 。

从日志中,我可以确认spark能够与mongoDB通信,并且我的pyspark作业正在提供空数据框。请查看以下屏幕截图。

pyspark日志-成功连接到mongoDB

Enter image description here

下面是相同代码的代码片段:

from pyspark import SparkConf, SparkContext
import sys
import json

sc = SparkContext()
spark = SparkSession(sc).builder.appName("MongoDbToS3").config("spark.mongodb.input.uri", "mongodb://username:password@host1,host2,host3/db.table/?replicaSet=ABCD&authSource=admin").getOrCreate()
data = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
data.show()

请让我知道我在 pyspark 代码中做错或遗漏了什么?

通过本机Python代码 - (问题 - 如果batch_size > 1并且batch_size = 1,则代码会卡住,它将打印前24个mongo文档,然后光标挂起)

我正在使用 pymongo 驱动程序通过本机 python 代码连接到 mongoDB。 问题是当我尝试获取/打印batch_size为1000的mongoDB文档时,代码永远挂起,然后给出网络超时错误。 但是,如果我使batch_size = 1,则光标能够在光标再次挂起后获取前24个文档。 我们观察到,与前 24 个文档相比,第 25 个文档非常大(大约 4kb),然后我们尝试跳过第 25 个文档,然后光标开始获取下一个文档,但它再次卡在其他位置,因此每当文档大小为 大了光标就卡住了。

你们能帮我理解这个问题吗?

网络端或 mongoDB 端有什么阻碍吗?

下面是代码片段:

from datetime import datetime
import json
#import boto3
from bson import json_util
import pymongo


client = pymongo.MongoClient("mongodb://username@host:port/?authSource=admin&socketTimeoutMS=3600000&maxIdleTimeMS=3600000")

# Database Name
db = client["database_name"]

# Collection Name
quoteinfo__collection= db["collection_name"]

results = quoteinfo__collection.find({}).batch_size(1000)
doc_count = quoteinfo__collection.count_documents({})

print("documents count from collection: ",doc_count)
print(results)
record_increment_no = 1

for record in results:
    print(record)
    print(record_increment_no)
    record_increment_no = record_increment_no + 1
results.close()

下面是输出屏幕截图 ![for batch_size = 1000 (code hangs and gives network timeout error)] (https://repost.aws/media/postImages/original/IMumguUSSGQWSkLwIqtOQ10Q) https://repost.aws/media/postImages/original/IMf14CYH8nS3u6zj9I7Ec4hA

![batch_size = 1 (prints documents only till 24th and then cursor hangs)] (https://repost.aws/media/postImages/original/IMlUdoBC6LRGqD-TJL6RUH1A)

profile picture
EXPERTE
gefragt vor 6 Monaten15 Aufrufe
1 Antwort
0

【以下的回答经过翻译处理】 您开发环境和MongoDB托管的AWS帐户之间的AWS帐户对等连接存在一些问题,具体如下:

  1. 某个路由的流量通过VPC对等连接而不是转接网关传输。
  2. MongoDB的IP地址不在路由表的CIDR范围内。

在为MongoDB IP1和MongoDB IP2添加转接网关后,我们能够正常读取任何集合的任何批量数据。

profile picture
EXPERTE
beantwortet vor 6 Monaten

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen