unable to form celery queue using aws document db

0

Below is the code version i am using

pymongo==3.7
python==3.7
celery==5.2.0

below is a sample code which can be tried

# app.py (Flask application)
from flask import Flask, jsonify
from celery import Celery
import db_connectors
import os
from urllib import parse


app = Flask(__name__)

mongo_host = os.environ.get("MONGO_HOST", "")
mongo_port = os.environ.get("MONGO_PORT", "")
mongo_db = ""

mongo_replica = os.environ.get("MONGO_USE_REPLICA", "")
mongo_replicaset = os.environ.get("MONGO_REPLICASET", "")
mongo_user = os.environ.get("MONGO_USER", "")
mongo_password = os.environ.get("MONGO_PASS", "")
doc_db_ssl = os.environ.get("MONGO_DOCUMENTDB_SSL", "")
doc_db_enabled = os.environ.get("DOCUMENTDB_ENABLE", "")

# Configure Flask app to use Celery
connection_string = mongo_url
app.config['CELERY_BROKER_URL'] = connection_string
app.config['CELERY_RESULT_BACKEND'] = connection_string

# Initialize Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
CELERY_BROKER_TRANSPORT_OPTIONS = {
"region": "us-east-1",
'visibility_timeout': 360,
'polling_interval': 1
}

# MongoDB configuration
mongo_client = db_connectors.get_mongo_connection()

# Celery task to fetch data from MongoDB
@celery.task
def fetch_data_from_mongo():
    data = mongo_client.connection.someDb.someTable.find({
        'find_key': '6523713'
    }) # Modify this query as needed
    result = [entry for entry in data]
    return result

@app.route('/fetch_data', methods=['GET'])
def fetch_data():
    task = fetch_data_from_mongo.delay()
    return jsonify({'task_id': str(task)})

@app.route('/fetch_data_without_delay', methods=['GET'])
def fetch_data_without_delay():
    task = fetch_data_from_mongo()
    return jsonify({'task_id': str(task)})

if __name__ == '__main__':
    app.run(debug=False, port=5000)

issue: celery is unable to form the queue using document db and causing the below execption

There are 2 api cals "/fetch_data_without_delay" and "/fetch_data" which essentially call the same method to fetch data from mongo db.
When I deploy the code in kubernates, the "/fetch_data_without_delay" method works fine and even fetches the data from mongo. but, the "/fetch_data" which uses the .delay() method throws the below error as it fails to connect to mongodb
please find the error below:
[2023-11-27 14:56:13,684: ERROR/MainProcess] consumer: Cannot connect to mongodb://user_here:**@rhost_here:port_here/db_here: Field 'size' is currently not supported.
Trying again in 32.00 seconds... (16/100)

[2023-11-27 14:56:45,838: CRITICAL/MainProcess] Unrecoverable error: OperationalError("Field 'size' is currently not supported")
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 925, in create_channel
    return self._avail_channels.pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/cached_property.py", line 70, in __get__
    return obj_dict[name]
KeyError: 'client'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 446, in _reraise_as_library_errors
    yield
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 437, in _ensure_connection
    callback, timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 312, in retry_over_time
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 877, in _connection_factory
    self._connection = self._establish_connection()
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 812, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 949, in establish_connection
    self._avail_channels.append(self.create_channel(self))
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 927, in create_channel
    channel = self.Channel(connection)
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/mongodb.py", line 146, in __init__
    self.client
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/objects.py", line 30, in __get__
    return super().__get__(instance, owner)
  File "/usr/local/lib/python3.7/site-packages/cached_property.py", line 74, in __get__
    return obj_dict.setdefault(name, self.func(obj))
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/mongodb.py", line 385, in client
    return self._create_client()
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/mongodb.py", line 378, in _create_client
    self._create_broadcast(database)
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/mongodb.py", line 354, in _create_broadcast
    capped=True)
  File "/usr/local/lib/python3.7/site-packages/pymongo/database.py", line 369, in create_collection
    read_concern, session=s, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/pymongo/collection.py", line 180, in __init__
    self.__create(kwargs, collation, session)
  File "/usr/local/lib/python3.7/site-packages/pymongo/collection.py", line 258, in __create
    collation=collation, session=session)
  File "/usr/local/lib/python3.7/site-packages/pymongo/collection.py", line 244, in _command
    retryable_write=retryable_write)
  File "/usr/local/lib/python3.7/site-packages/pymongo/pool.py", line 579, in command
    unacknowledged=unacknowledged)
  File "/usr/local/lib/python3.7/site-packages/pymongo/network.py", line 150, in command
    parse_write_concern_error=parse_write_concern_error)
  File "/usr/local/lib/python3.7/site-packages/pymongo/helpers.py", line 155, in _check_command_response
    raise OperationFailure(msg % errmsg, code, response)
pymongo.errors.OperationFailure: Field 'size' is currently not supported

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/celery/worker/worker.py", line 203, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python3.7/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.7/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 326, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.7/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/connection.py", line 21, in start
    c.connection = c.connect()
  File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 422, in connect
    conn = self.connection_for_read(heartbeat=self.amqheartbeat)
  File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 429, in connection_for_read
    self.app.connection_for_read(heartbeat=heartbeat))
  File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 456, in ensure_connected
    callback=maybe_shutdown,
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 381, in ensure_connection
    self._ensure_connection(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 437, in _ensure_connection
    callback, timeout=timeout
  File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 450, in _reraise_as_library_errors
    raise ConnectionError(str(exc)) from exc
kombu.exceptions.OperationalError: Field 'size' is currently not supported

please help me out with possible solutions

vinay
gefragt vor 5 Monaten78 Aufrufe
Keine Antworten

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen