Amazon MWAA 環境で Queued 状態に留まっているタスクをトラブルシューティングする方法を教えてください。
Amazon Managed Workflows for Apache Airflow (Amazon MWAA) でワークフローを実行していますが、タスクが Queued 状態から変化せず、Running 状態に移行しません。
簡単な説明
次の要因で、Amazon MWAA のタスクが Queued 状態から移行できない場合があります。
- 環境が同時実行タスクの最大数に達した。
- Airflow 構成オプションが MWAA 環境で正しく設定されていない。
- ワーカー上のタスク用のメモリまたは CPU が不足している。
タスクを実行する通常のワークフローに障害が発生すると、タスクは Queued 状態に留まります。Apache Airflow ワーカーに過負荷がかかり、指定された時間内に応答しきれなくなる場合があります。この場合、12 時間後にデフォルトの可視性タイムアウトに達するまで、タスクは Amazon Simple Queue Service (Amazon SQS) キューに残ります。再試行を設定した場合は、Apache Airflow スケジューラーはタスクを再試行します。
解決策
トラブルシューティングを行う前に、環境リソースの負荷が最大に達しているのか、それともワーカーに関連する問題が発生しているのかを判断してください。Amazon CloudWatch を使用し、環境のワーカーログおよび、メトリクス CPUUtilization と MemoryUtilization を確認します。
環境が同時実行タスクの最大数に達したかどうかを確認する
Amazon MWAA プールに空きがなくなり、環境がキューにさらにタスクを追加した場合、環境は同時タスクの最大数に達します。この問題を解決するには、環境のワーカー数を増やすか、環境のクラスサイズを変更します。
環境のワーカー数を増やす必要があるかどうかを判断するには、次の手順を実行します。
- CloudWatch コンソールを開きます。
- ナビゲーションペインで [メトリクス] を選択し、[すべてのメトリクス] を選択します。
- [参照] タブを選択し、環境が置かれた AWS リージョンを選択してから、環境の名前を検索します。
- [AWS 名前空間] セクションで [MWAA < キュー] を選択します。
- QueuedTasks および RunningTasks を選択します。
- グラフでアクティビティが最も多い期間を特定し、両方のメトリクスの合計数を加算します。
注: 合計は、この期間のタスクの合計数を示します。 - 環境のデフォルト同時実行レベルを判断します。
注: たとえば、mw1.small 環境には、ワーカーごとに 5 つの同時タスクがあります。 - タスクの合計数を、同時実行タスクのデフォルトレベルで除算します。
- その数から、環境に設定した最大ワーカー数を減算します。
注: 結果が 0 より大きい場合は、ワーカーを追加し、現在の同時タスク数を満たす必要があります。
環境のワーカー数を増やしたり、環境のクラスサイズを変更したりするには、次の手順を実行します。
- Amazon MWAA コンソールを開きます。
- 該当する環境で [編集] を選択し、[次へ] を選択します。
- [環境クラス] セクションで次の操作を行います。
ステップ 9 で決定した最大ワーカー数を増やします。
さらに、最小ワーカー数を、アクティビティが最も少ない期間にワークロードが必要とする値に設定します。
注: 環境に追加できるワーカーには最大 25 個の制限があります。25 よりも多くワーカーが必要な場合は、[環境クラス] で大容量サイズを選択します。 - 環境のクラスサイズを増やす場合は、ワークロードに必要な最大および最小ワーカー数も設定してください。
ワーカー数を最適化してもワークロードには十分でない場合は、次の手順を実行します。
- Apache Airflow センサーの代わりに遅延可能なオペレーターを使用します。詳細については、Apache Airflow のウェブサイトで「遅延可能なオペレーターおよびトリガー」を参照してください。
- 実行開始時間をずらし、有向非巡回グラフ (DAG) の schedule_interval の間隔を小さくします。DAG をブロック単位でスケジュールします。
- 特定の外部関数を呼び出して監視するカスタムコードを使用する場合は、タスクを 2 つのタスクに分割します。呼び出し用に 1 つのタスクを作成し、もう一方を関数を監視するための遅延可能なオペレーターとして作成します。
Airflow 構成オプションに設定ミスがないか確認する
Airflow 構成オプションを確認するには、次の手順を実行します。
- MWAA コンソールを開きます。
- [環境] を選択し、該当する MWAA 環境を選択します。
- [Airflow 構成オプション] で core.parallelism および celery.worker_autoscale を確認します。
core.parallelism が設定されている場合は、手動で設定した core.parallelism オプションをすべて削除し、Amazon MWAA が動的に設定を行えるようにします。Amazon MWAA は、(maxWorkers * maxCeleryWorkers) / スケジューラー数 * 1.5 という式を使用して動的なデフォルト設定を計算します。Auto Scaling を使用して手動で値を設定した場合、最大負荷時に使用率が低くなり、問題が発生する可能性があります。
構成オプション celery.worker_autoscale の値をデフォルトの同時実行レベルと比較します。celery.worker_autoscale 設定オプションを変更していない場合は、デフォルトの同時実行レベルに、環境に設定した最大ワーカー数を乗算します。
**celery.worker_autoscale ** 値が意図せずデフォルト値より低くなっている場合は、CloudWatch メトリクスを使用してワーカーの CPU とメモリの使用状況を監視します。最大負荷時のリソース値が 20 ~ 60% の場合は、celery.worker_autoscale の値を増加させます。ワーカーコンテナの過剰使用を避けるために、少しずつ増やしてください。
celery.worker_autoscale 値が未設定の場合、またはデフォルト値をそのまま使用している場合は、ワーカーの CPU とメモリの使用状況を監視します。環境でのメトリクス値が高すぎる場合は、celery.worker_autoscale の値を下げます。最大負荷時の環境が 20 ~ 60% の場合は、最大値を増やすことができます。
過剰使用が原因でワーカーに障害が発生していないか確認する
MWAA ワーカーコンテナのすべての Celery ワーカーにタスクがあり、負荷が最大になると、ワーカーの過剰使用が原因で障害が発生する可能性があります。
MWAA Worker コンテナの Celery ワーカーは、現在使用されていないタスクをポーリングします。実行中のタスクとそれを定義するコードの複雑さによっては、ワーカーの過剰使用が原因でクラッシュする可能性があります。この問題は、MWAA ワーカーコンテナ上のすべての Celery ワーカーにタスクがあり、負荷が最大に達している場合に発生します。
ワーカーが過剰使用されており、機能不全に陥っていないかどうかを判断するには、次の手順を実行します。
- CloudWatch コンソールを開きます。
- ナビゲーションペインで [メトリクス] を選択し、[すべてのメトリクス] を選択します。
- [参照] タブを選択し、環境が置かれた AWS リージョンを選択してから、環境の名前を検索します。
- [AWS 名前空間] セクションで [MWAA < Queue] を選択し、ApproximateAgeOfOldestTask を選択します。
- 時間範囲を拡大し、期間には 4 ~ 6 週間を含めます。
注: ピークが 40,000 秒以上の場合は、そのタスクが Amazon SQS キューに滞留しており、ワーカーでは過剰使用による障害が発生していることを示しています。また、システムによる強制終了が原因で、Celery ワーカーは障害をイベントバッファに書き込むことができません。
CloudWatch Insights を使用し、タスクが Amazon SQS キューに滞留しているときにアラートを出すこともできます。
アラートを作成するには、次の手順を実行します。
-
CloudWatch コンソールを開きます。
-
ナビゲーションペインで [ログ] を選択し、[Logs Insights] を選択します。
-
期間を 4 ~ 6 週間の範囲で指定します。
-
[選択基準] メニューで MWAA 環境のスケジューラーロググループを選択します。
-
クエリセクションに次のクエリを入力します。
fields _@timestamp_, _@message_, _@logStream_, _@log_ | filter _@message_ like /Was the task terminated externally?/ | sort _@timestamp_ desc | limit 10000
以前にキューに入ったタスクを受信した際に、スケジューラーが送信するログの例を次に示します。
[[34m**2024-01-17T11:30:18.936+0000**[0m] [34mscheduler_job_runner.py:[0m771 ERROR[0m - Executor reports task instance <TaskInstance: dag_name.task_name manual__202X-XX-XXTXX:XX:XX.758774+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task terminated externally?[0m
コンピューティングやメモリを大量に消費するワークロードを軽減する
注: 次のリストを綿密に考慮してください。すべての要因がすべてのユースケースに当てはまるわけではありません。追加のサポートが必要な場合は、AWS サポートにお問い合わせください。
環境内のコンピューティングまたはメモリを大量に消費するワークロードを減らすには、次の手順を実行します。
- DAG コードに、抽出、変換、ロード (ETL) スクリプト、データ移動の指示、AI または ML パイプライン、その他のコンピューティングまたはメモリを大量に消費するワークロードが含まれていないことを確認してください。
- DAG コードを記述する際、Apache Airflow の推奨事項に従ってください。最上位のコードを最小化し、必要なものだけをインポートするようにしてください。詳細については、Apache Airflow のウェブサイトで「ベストプラクティス」を参照してください。
- DAG コードを最適化します。センサー、フック、カスタムオペレーター、拡張オペレーター、継承オペレーターのメモリフットプリントを分析し、潜在的な問題領域を特定します。
リソースの過剰使用が解消されない場合は、次の手順を実行します。
- celery.worker_autoscale の値をデフォルト値設定よりも低くします。celery.worker_autoscale の値を数桁減らし、24 ~ 48 時間環境を監視します。最適なレベルに達するまで celery.worker_autoscale の値を継続的に減らします。
注: celery.worker_autoscale の値を小さくすると、タスクプール全体が減少するため、長時間 Queue 状態に留まる項目数が増えます。これに対処するには、最低ワーカー数も増やす必要があります。 - さらに、「環境が同時実行タスクの最大数に達したかどうかを確認する」セクションの手順を再度実行し、ワーカーあたりの同時タスク数を減らします。
関連情報
Amazon MWAA での Apache Airflow のパフォーマンスを調整する
設定リファレンス (Apache Airflow のウェブサイト)
- トピック
- アプリケーション統合
- 言語
- 日本語
