MWAA Queued Tasks after being idle

2

I am currently struggling to diagnose an issue with MWAA. The behavior we are seeing is that Dags run great after the MWAA is setup or updated, after a the system has been idle for ~10 minutes when a new tasks is queue up and just sit there. These tasks will stay queued for well over an hour. If we let them stay queued overnight they will eventually complete successfully.

We originally started with airflow 2.2.2, and have downgraded to 2.0.2. Originally we had no configuration options, reading the various guides we have added with no change in behavior.

celery.autoscale	1,1
scheduler.dag_dir_list_interval	600
scheduler.min_file_process_interval	300
scheduler.parsing_processes	1
scheduler.processor_poll_interval	60

The only error we are sometimes seeing on the workers is

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/celery/backends/database/__init__.py", line 51, in _inner
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/celery/backends/database/__init__.py", line 121, in _store_result
    task = list(session.query(self.task_cls).filter(self.task_cls.task_id == task_id))
  File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
    return self._execute_and_instances(context)
  File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
    distilled_params,
  File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.DatabaseError: (psycopg2.DatabaseError) could not receive data from server: Connection timed out

[SQL: SELECT celery_taskmeta.id AS celery_taskmeta_id, celery_taskmeta.task_id AS celery_taskmeta_task_id, celery_taskmeta.status AS celery_taskmeta_status, celery_taskmeta.result AS celery_taskmeta_result, celery_taskmeta.date_done AS celery_taskmeta_date_done, celery_taskmeta.traceback AS celery_taskmeta_traceback 
FROM celery_taskmeta 
WHERE celery_taskmeta.task_id = %(task_id_1)s]
[parameters: {'task_id_1': '01673ec1-049c-45bc-a45c-121987ec998d'}]
(Background on this error at: http://sqlalche.me/e/13/4xp6)

We have tried changing the instance size from small to medium. We have set the number of workers from 2 to 2 to avoid any auto-scaling issues.

We only have 3 Dags enabled and if we try to run a simple tag which only writes a log file and quit it will just say queued.

It seems like once a worker goes idle it never comes back. We have to apply some kind of update to mwaa to restart the Dags to run.

asked 2 years ago2162 views
1 Answer
0

This bug has been plaguing my team for weeks (and seemingly others for over a year https://forums.aws.amazon.com/thread.jspa?threadID=336465) but we finally seem to have gotten it under control.

The following is the full set of configuration values we used but I suspect the key is setting celery.pool to solo. Hope this resolves the issue for you as well.

celery.pool	solo
celery.sync_parallelism	1
celery.worker_autoscale	1,1
core.dag_file_processor_timeout	300
core.dagbag_import_timeout	240
core.killed_task_cleanup_time	604800
core.min_serialized_dag_update_interval	300
scheduler.dag_dir_list_interval	600
scheduler.min_file_process_interval	300
scheduler.parsing_processes	2
scheduler.schedule_after_task_execution	false
scheduler.scheduler_idle_sleep_time	300
answered 2 years ago
  • Hello,

    I tried the "celery.pool": "solo" option but my tasks still get stuck in queued state unfortunately. I wish I had known about this issue before I migrated to a 2.2.2 environment. The 2.0.2 environment never exhibited this problem...

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions