无法使用Pyspark或Python从MongoDB读取数据。

0

【以下的问题经过翻译处理】 我正在尝试使用PySpark和本地Python在AWS EMR中从3个节点的MongoDB集群(副本集)中读取数据。当在AWS EMR群集中执行代码时,我遇到了问题,如下所述,但相同的代码在我的本地Windows机器上运行良好。

  • Spark版本- 2.4.8
  • Scala版本- 2.11.12
  • MongoDB版本- 4.4.8
  • mongo-spark-connector版本- mongo-spark-connector_2.11:2.4.4
  • Python版本- 3.7.10

通过Pyspark-(问题- p yspark给出空数据框)

在本地和群集模式下运行pyspark作业时,以下是命令。

  1. 本地模式:
  2. 群集模式:

两种模式下,即使从Spark群集中的所有节点执行telnet(从所有节点)都可以正常工作,但我无法从mongoDB中读取数据(空数据框) 。

从日志中,我可以确认spark能够与mongoDB通信,并且我的pyspark作业正在提供空数据框。请查看以下屏幕截图。

pyspark日志-成功连接到mongoDB

Enter image description here

下面是相同代码的代码片段:

from pyspark import SparkConf, SparkContext
import sys
import json

sc = SparkContext()
spark = SparkSession(sc).builder.appName("MongoDbToS3").config("spark.mongodb.input.uri", "mongodb://username:password@host1,host2,host3/db.table/?replicaSet=ABCD&authSource=admin").getOrCreate()
data = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
data.show()

请让我知道我在 pyspark 代码中做错或遗漏了什么?

通过本机Python代码 - (问题 - 如果batch_size > 1并且batch_size = 1,则代码会卡住,它将打印前24个mongo文档,然后光标挂起)

我正在使用 pymongo 驱动程序通过本机 python 代码连接到 mongoDB。 问题是当我尝试获取/打印batch_size为1000的mongoDB文档时,代码永远挂起,然后给出网络超时错误。 但是,如果我使batch_size = 1,则光标能够在光标再次挂起后获取前24个文档。 我们观察到,与前 24 个文档相比,第 25 个文档非常大(大约 4kb),然后我们尝试跳过第 25 个文档,然后光标开始获取下一个文档,但它再次卡在其他位置,因此每当文档大小为 大了光标就卡住了。

你们能帮我理解这个问题吗?

网络端或 mongoDB 端有什么阻碍吗?

下面是代码片段:

from datetime import datetime
import json
#import boto3
from bson import json_util
import pymongo


client = pymongo.MongoClient("mongodb://username@host:port/?authSource=admin&socketTimeoutMS=3600000&maxIdleTimeMS=3600000")

# Database Name
db = client["database_name"]

# Collection Name
quoteinfo__collection= db["collection_name"]

results = quoteinfo__collection.find({}).batch_size(1000)
doc_count = quoteinfo__collection.count_documents({})

print("documents count from collection: ",doc_count)
print(results)
record_increment_no = 1

for record in results:
    print(record)
    print(record_increment_no)
    record_increment_no = record_increment_no + 1
results.close()

下面是输出屏幕截图 ![for batch_size = 1000 (code hangs and gives network timeout error)] (https://repost.aws/media/postImages/original/IMumguUSSGQWSkLwIqtOQ10Q) https://repost.aws/media/postImages/original/IMf14CYH8nS3u6zj9I7Ec4hA

![batch_size = 1 (prints documents only till 24th and then cursor hangs)] (https://repost.aws/media/postImages/original/IMlUdoBC6LRGqD-TJL6RUH1A)

profile picture
专家
已提问 5 个月前10 查看次数
1 回答
0

【以下的回答经过翻译处理】 您开发环境和MongoDB托管的AWS帐户之间的AWS帐户对等连接存在一些问题,具体如下:

  1. 某个路由的流量通过VPC对等连接而不是转接网关传输。
  2. MongoDB的IP地址不在路由表的CIDR范围内。

在为MongoDB IP1和MongoDB IP2添加转接网关后,我们能够正常读取任何集合的任何批量数据。

profile picture
专家
已回答 5 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则