unable to form celery queue using aws document db

0

Below is the code version i am using

pymongo==3.7
python==3.7
celery==5.2.0

below is a sample code which can be tried

# app.py (Flask application)
from flask import Flask, jsonify
from celery import Celery
import db_connectors
import os
from urllib import parse


app = Flask(__name__)

mongo_host = os.environ.get("MONGO_HOST", "")
mongo_port = os.environ.get("MONGO_PORT", "")
mongo_db = ""

mongo_replica = os.environ.get("MONGO_USE_REPLICA", "")
mongo_replicaset = os.environ.get("MONGO_REPLICASET", "")
mongo_user = os.environ.get("MONGO_USER", "")
mongo_password = os.environ.get("MONGO_PASS", "")
doc_db_ssl = os.environ.get("MONGO_DOCUMENTDB_SSL", "")
doc_db_enabled = os.environ.get("DOCUMENTDB_ENABLE", "")

# Configure Flask app to use Celery
connection_string = mongo_url
app.config['CELERY_BROKER_URL'] = connection_string
app.config['CELERY_RESULT_BACKEND'] = connection_string

# Initialize Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
CELERY_BROKER_TRANSPORT_OPTIONS = {
"region": "us-east-1",
'visibility_timeout': 360,
'polling_interval': 1
}

# MongoDB configuration
mongo_client = db_connectors.get_mongo_connection()

# Celery task to fetch data from MongoDB
@celery.task
def fetch_data_from_mongo():
    data = mongo_client.connection.someDb.someTable.find({
        'find_key': '6523713'
    }) # Modify this query as needed
    result = [entry for entry in data]
    return result

@app.route('/fetch_data', methods=['GET'])
def fetch_data():
    task = fetch_data_from_mongo.delay()
    return jsonify({'task_id': str(task)})

@app.route('/fetch_data_without_delay', methods=['GET'])
def fetch_data_without_delay():
    task = fetch_data_from_mongo()
    return jsonify({'task_id': str(task)})

if __name__ == '__main__':
    app.run(debug=False, port=5000)

issue: celery is unable to form the queue using document db and causing the below execption

There are 2 api cals "/fetch_data_without_delay" and "/fetch_data" which essentially call the same method to fetch data from mongo db.
When I deploy the code in kubernates, the "/fetch_data_without_delay" method works fine and even fetches the data from mongo. but, the "/fetch_data" which uses the .delay() method throws the below error as it fails to connect to mongodb
please find the error below:
[2023-11-27 14:56:13,684: ERROR/MainProcess] consumer: Cannot connect to mongodb://user_here:**@rhost_here:port_here/db_here: Field 'size' is currently not supported.
Trying again in 32.00 seconds... (16/100)

[2023-11-27 14:56:45,838: CRITICAL/MainProcess] Unrecoverable error: OperationalError("Field 'size' is currently not supported")
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 925, in create_channel
    return self._avail_channels.pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/cached_property.py", line 70, in __get__
    return obj_dict[name]
KeyError: 'client'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 446, in _reraise_as_library_errors
    yield
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 437, in _ensure_connection
    callback, timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 312, in retry_over_time
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 877, in _connection_factory
    self._connection = self._establish_connection()
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 812, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 949, in establish_connection
    self._avail_channels.append(self.create_channel(self))
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 927, in create_channel
    channel = self.Channel(connection)
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/mongodb.py", line 146, in __init__
    self.client
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/objects.py", line 30, in __get__
    return super().__get__(instance, owner)
  File "/usr/local/lib/python3.7/site-packages/cached_property.py", line 74, in __get__
    return obj_dict.setdefault(name, self.func(obj))
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/mongodb.py", line 385, in client
    return self._create_client()
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/mongodb.py", line 378, in _create_client
    self._create_broadcast(database)
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/mongodb.py", line 354, in _create_broadcast
    capped=True)
  File "/usr/local/lib/python3.7/site-packages/pymongo/database.py", line 369, in create_collection
    read_concern, session=s, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/pymongo/collection.py", line 180, in __init__
    self.__create(kwargs, collation, session)
  File "/usr/local/lib/python3.7/site-packages/pymongo/collection.py", line 258, in __create
    collation=collation, session=session)
  File "/usr/local/lib/python3.7/site-packages/pymongo/collection.py", line 244, in _command
    retryable_write=retryable_write)
  File "/usr/local/lib/python3.7/site-packages/pymongo/pool.py", line 579, in command
    unacknowledged=unacknowledged)
  File "/usr/local/lib/python3.7/site-packages/pymongo/network.py", line 150, in command
    parse_write_concern_error=parse_write_concern_error)
  File "/usr/local/lib/python3.7/site-packages/pymongo/helpers.py", line 155, in _check_command_response
    raise OperationFailure(msg % errmsg, code, response)
pymongo.errors.OperationFailure: Field 'size' is currently not supported

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/celery/worker/worker.py", line 203, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python3.7/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.7/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 326, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.7/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/connection.py", line 21, in start
    c.connection = c.connect()
  File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 422, in connect
    conn = self.connection_for_read(heartbeat=self.amqheartbeat)
  File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 429, in connection_for_read
    self.app.connection_for_read(heartbeat=heartbeat))
  File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 456, in ensure_connected
    callback=maybe_shutdown,
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 381, in ensure_connection
    self._ensure_connection(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 437, in _ensure_connection
    callback, timeout=timeout
  File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 450, in _reraise_as_library_errors
    raise ConnectionError(str(exc)) from exc
kombu.exceptions.OperationalError: Field 'size' is currently not supported

please help me out with possible solutions

vinay
asked 5 months ago74 views
No Answers

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions