I am currently struggling to diagnose an issue with MWAA. The behavior we are seeing is that Dags run great after the MWAA is setup or updated, after a the system has been idle for ~10 minutes when a new tasks is queue up and just sit there. These tasks will stay queued for well over an hour. If we let them stay queued overnight they will eventually complete successfully.
We originally started with airflow 2.2.2, and have downgraded to 2.0.2.
Originally we had no configuration options, reading the various guides we have added with no change in behavior.
celery.autoscale 1,1
scheduler.dag_dir_list_interval 600
scheduler.min_file_process_interval 300
scheduler.parsing_processes 1
scheduler.processor_poll_interval 60
The only error we are sometimes seeing on the workers is
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/celery/backends/database/__init__.py", line 51, in _inner
return fun(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/celery/backends/database/__init__.py", line 121, in _store_result
task = list(session.query(self.task_cls).filter(self.task_cls.task_id == task_id))
File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
return self._execute_and_instances(context)
File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
distilled_params,
File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.DatabaseError: (psycopg2.DatabaseError) could not receive data from server: Connection timed out
[SQL: SELECT celery_taskmeta.id AS celery_taskmeta_id, celery_taskmeta.task_id AS celery_taskmeta_task_id, celery_taskmeta.status AS celery_taskmeta_status, celery_taskmeta.result AS celery_taskmeta_result, celery_taskmeta.date_done AS celery_taskmeta_date_done, celery_taskmeta.traceback AS celery_taskmeta_traceback
FROM celery_taskmeta
WHERE celery_taskmeta.task_id = %(task_id_1)s]
[parameters: {'task_id_1': '01673ec1-049c-45bc-a45c-121987ec998d'}]
(Background on this error at: http://sqlalche.me/e/13/4xp6)
We have tried changing the instance size from small to medium.
We have set the number of workers from 2 to 2 to avoid any auto-scaling issues.
We only have 3 Dags enabled and if we try to run a simple tag which only writes a log file and quit it will just say queued.
It seems like once a worker goes idle it never comes back. We have to apply some kind of update to mwaa to restart the Dags to run.
Hello,
I tried the "celery.pool": "solo" option but my tasks still get stuck in queued state unfortunately. I wish I had known about this issue before I migrated to a 2.2.2 environment. The 2.0.2 environment never exhibited this problem...