Skip to content

MWAA Airflow 3.0.6 worker memory leak: 600+ orphaned watchtower _dequeue_batch threads accumulate in ForkPoolWorker when using deferrable KubernetesPodOperator, eventually causing OOM

0

Environment:

  • MWAA environment class: mw1.small
  • Apache Airflow version: 3.0.6
  • Executor: CeleryExecutor (default)
  • core.execute_tasks_new_python_interpreter: True (Airflow 3 default)
  • Region: eu-central-1
  • celery.worker_autoscale: 1,1
  • min_workers: 10, max_workers: 25
  • Using KubernetesPodOperator class with deferrable=True, config_file=<path>, kubernetes_conn_id=None

Symptom: AdditionalWorker containers show a consistent sawtooth memory growth pattern in CloudWatch MemoryUtilization: steady climb to ~97% followed by OOM restart, then immediate climb again. The growth is not correlated with RunningTasks (which stays flat) but is correlated with ActiveConnectionCount climbing over time. BaseWorkers show the same pattern at a lower rate.

Root cause (identified via py-spy dump): A py-spy dump on the ForkPoolWorker process (PID 113, ~6 hours old) showed 624 Python threads, where 600+ are all idle and stuck in the same stack: Thread 9917 (idle): "Thread-652 (_dequeue_batch)" wait (threading.py:359) get (queue.py:180) _dequeue_batch (watchtower/init.py:483) run (threading.py:1012) _bootstrap_inner (threading.py:1075) _bootstrap (threading.py:1032) I assume these are background batcher threads owned by watchtower.CloudWatchLogHandler instances — the library MWAA uses to ship Airflow task logs to CloudWatch.

Correlation with deferrable KubernetesPodOperator: The leak manifests when running KubernetesPodOperator with deferrable=True. Not with regular PythonOperators.

Mitigations attempted (none fully resolve it):

  • celery.worker_autoscale=1,1 — already set.
  • celery.worker_max_tasks_per_child=50 — recycles the ForkPoolWorker but the new one inherits leaked state from celeryd MainProcess via fork(); thread count rebuilds quickly. (Tried with both core.execute_tasks_new_python_interpreter: True and False)
  • workers.socket_cleanup_timeout=30 — no observable effect on thread accumulation.
  • Manual SIGTERM to leaky ForkPoolWorker processes — effective but only helps reactively.

Questions: Is this a known issue with how MWAA 3.0.6 configures the CloudWatchLogHandler lifecycle for deferrable task deferral flows? The handler creation path is owned by MWAA, not user code — we cannot cleanly patch it from a DAG. What is the recommended way to ensure watchtower.CloudWatchLogHandler instances are .close()d when a deferrable task execution context tears down?

1 Answer
0

Based on the symptoms you're describing—orphaned watchtower threads accumulating in worker processes when using deferrable KubernetesPodOperator—this appears to be a resource cleanup issue related to how CloudWatch log handlers are managed during the deferrable task lifecycle.

While I don't have specific documentation confirming this exact issue with MWAA 3.0.6 and deferrable operators, your analysis points to a legitimate concern about handler lifecycle management. The fact that celery.worker_max_tasks_per_child=50 doesn't fully resolve the issue because forked processes inherit the leaked state is particularly telling.

For your immediate situation, here are some approaches to consider:

Configuration Adjustments:

You might try reducing the celery.worker_autoscale values even further below the defaults. For mw1.small environments, the default is 5,0 (maximum, minimum task concurrency). Since you've set it to 1,1, you're already at the minimum. However, you could also consider whether your worker count (min_workers: 10, max_workers: 25) is appropriately sized for your workload, as more workers means more processes that can accumulate these threads.

Monitoring and Reactive Measures:

Since you've identified that MemoryUtilization climbs to ~97% before OOM restarts, you should set up CloudWatch alarms on the MemoryUtilization metric for your AdditionalWorker containers. Monitor this metric with a 1-minute period and appropriate thresholds to get early warning before OOM events occur. You can also track the correlation with ActiveConnectionCount as you've observed.

Escalation Path:

Given that the CloudWatchLogHandler creation and lifecycle is managed by MWAA's infrastructure layer (not user-configurable code), and this appears to be specific to the interaction between deferrable operators and the logging subsystem, this warrants escalation to AWS Support. When opening a case, include:

  • Your py-spy thread dumps showing the watchtower thread accumulation
  • CloudWatch metrics showing the memory sawtooth pattern correlated with ActiveConnectionCount
  • Confirmation that the issue only manifests with deferrable=True on KubernetesPodOperator
  • Details about your attempted mitigations

The fact that core.execute_tasks_new_python_interpreter: True (the Airflow 3 default) doesn't prevent the leak suggests this may be a deeper integration issue between MWAA's logging infrastructure and the deferrable task execution model that requires a platform-level fix.

In the interim, if manual SIGTERM to leaky processes is effective, you might consider implementing automated monitoring that detects high thread counts or memory usage and triggers worker recycling, though this is clearly a workaround rather than a solution.
Sources
Performance tuning for Apache Airflow on Amazon MWAA - Amazon Managed Workflows for Apache Airflow
Container, queue, and database metrics for Amazon MWAA - Amazon Managed Workflows for Apache Airflow

answered a month ago
EXPERT
reviewed a month ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.