Skip to content

Amazon MWAA 環境で Queued 状態に留まっているタスクをトラブルシューティングする方法を教えてください。

所要時間3分
0

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) でワークフローを実行していますが、タスクが Queued 状態から変化せず、Running 状態に移行しません。

簡単な説明

次の要因で、Amazon MWAA のタスクが Queued 状態から移行できない場合があります。

  • 環境が同時実行タスクの最大数に達した。
  • Airflow 構成オプションが MWAA 環境で正しく設定されていない。
  • ワーカー上のタスク用のメモリまたは CPU が不足している。

タスクを実行する通常のワークフローに障害が発生すると、タスクは Queued 状態に留まります。Apache Airflow ワーカーに過負荷がかかり、指定された時間内に応答しきれなくなる場合があります。この場合、12 時間後にデフォルトの可視性タイムアウトに達するまで、タスクは Amazon Simple Queue Service (Amazon SQS) キューに残ります。再試行を設定した場合は、Apache Airflow スケジューラーはタスクを再試行します。

解決策

トラブルシューティングを行う前に、環境リソースの負荷が最大に達しているのか、それともワーカーに関連する問題が発生しているのかを判断してください。Amazon CloudWatch を使用し、環境のワーカーログおよび、メトリクス CPUUtilization と MemoryUtilization を確認します。

環境が同時実行タスクの最大数に達したかどうかを確認する

Amazon MWAA プールに空きがなくなり、環境がキューにさらにタスクを追加した場合、環境は同時タスクの最大数に達します。この問題を解決するには、環境のワーカー数を増やすか、環境のクラスサイズを変更します。

環境のワーカー数を増やす必要があるかどうかを判断するには、次の手順を実行します。

  1. CloudWatch コンソールを開きます。
  2. ナビゲーションペインで [メトリクス] を選択し、[すべてのメトリクス] を選択します。
  3. [参照] タブを選択し、環境が置かれた AWS リージョンを選択してから、環境の名前を検索します。
  4. [AWS 名前空間] セクションで [MWAA < キュー] を選択します。
  5. QueuedTasks および RunningTasks を選択します。
  6. グラフでアクティビティが最も多い期間を特定し、両方のメトリクスの合計数を加算します。
    注: 合計は、この期間のタスクの合計数を示します。
  7. 環境のデフォルト同時実行レベルを判断します。
    注: たとえば、mw1.small 環境には、ワーカーごとに 5 つの同時タスクがあります。
  8. タスクの合計数を、同時実行タスクのデフォルトレベルで除算します。
  9. その数から、環境に設定した最大ワーカー数を減算します。
    注: 結果が 0 より大きい場合は、ワーカーを追加し、現在の同時タスク数を満たす必要があります。

環境のワーカー数を増やしたり、環境のクラスサイズを変更したりするには、次の手順を実行します。

  1. Amazon MWAA コンソールを開きます。
  2. 該当する環境で [編集] を選択し、[次へ] を選択します。
  3. [環境クラス] セクションで次の操作を行います。
    ステップ 9 で決定した最大ワーカー数を増やします。
    さらに、最小ワーカー数を、アクティビティが最も少ない期間にワークロードが必要とする値に設定します。
    注: 環境に追加できるワーカーには最大 25 個の制限があります。25 よりも多くワーカーが必要な場合は、[環境クラス] で大容量サイズを選択します。
  4. 環境のクラスサイズを増やす場合は、ワークロードに必要な最大および最小ワーカー数も設定してください。

ワーカー数を最適化してもワークロードには十分でない場合は、次の手順を実行します。

  • Apache Airflow センサーの代わりに遅延可能なオペレーターを使用します。詳細については、Apache Airflow のウェブサイトで「遅延可能なオペレーターおよびトリガー」を参照してください。
  • 実行開始時間をずらし、有向非巡回グラフ (DAG) の schedule_interval の間隔を小さくします。DAG をブロック単位でスケジュールします。
  • 特定の外部関数を呼び出して監視するカスタムコードを使用する場合は、タスクを 2 つのタスクに分割します。呼び出し用に 1 つのタスクを作成し、もう一方を関数を監視するための遅延可能なオペレーターとして作成します。

Airflow 構成オプションに設定ミスがないか確認する

Airflow 構成オプションを確認するには、次の手順を実行します。

  1. MWAA コンソールを開きます。
  2. [環境] を選択し、該当する MWAA 環境を選択します。
  3. [Airflow 構成オプション]core.parallelism および celery.worker_autoscale を確認します。

core.parallelism が設定されている場合は、手動で設定した core.parallelism オプションをすべて削除し、Amazon MWAA が動的に設定を行えるようにします。Amazon MWAA は、(maxWorkers * maxCeleryWorkers) / スケジューラー数 * 1.5 という式を使用して動的なデフォルト設定を計算します。Auto Scaling を使用して手動で値を設定した場合、最大負荷時に使用率が低くなり、問題が発生する可能性があります。

構成オプション celery.worker_autoscale の値をデフォルトの同時実行レベルと比較します。celery.worker_autoscale 設定オプションを変更していない場合は、デフォルトの同時実行レベルに、環境に設定した最大ワーカー数を乗算します。

**celery.worker_autoscale ** 値が意図せずデフォルト値より低くなっている場合は、CloudWatch メトリクスを使用してワーカーの CPU とメモリの使用状況を監視します。最大負荷時のリソース値が 20 ~ 60% の場合は、celery.worker_autoscale の値を増加させます。ワーカーコンテナの過剰使用を避けるために、少しずつ増やしてください。

celery.worker_autoscale 値が未設定の場合、またはデフォルト値をそのまま使用している場合は、ワーカーの CPU とメモリの使用状況を監視します。環境でのメトリクス値が高すぎる場合は、celery.worker_autoscale の値を下げます。最大負荷時の環境が 20 ~ 60% の場合は、最大値を増やすことができます。

過剰使用が原因でワーカーに障害が発生していないか確認する

MWAA ワーカーコンテナのすべての Celery ワーカーにタスクがあり、負荷が最大になると、ワーカーの過剰使用が原因で障害が発生する可能性があります。

MWAA Worker コンテナの Celery ワーカーは、現在使用されていないタスクをポーリングします。実行中のタスクとそれを定義するコードの複雑さによっては、ワーカーの過剰使用が原因でクラッシュする可能性があります。この問題は、MWAA ワーカーコンテナ上のすべての Celery ワーカーにタスクがあり、負荷が最大に達している場合に発生します。

ワーカーが過剰使用されており、機能不全に陥っていないかどうかを判断するには、次の手順を実行します。

  1. CloudWatch コンソールを開きます。
  2. ナビゲーションペインで [メトリクス] を選択し、[すべてのメトリクス] を選択します。
  3. [参照] タブを選択し、環境が置かれた AWS リージョンを選択してから、環境の名前を検索します。
  4. [AWS 名前空間] セクションで [MWAA < Queue] を選択し、ApproximateAgeOfOldestTask を選択します。
  5. 時間範囲を拡大し、期間には 4 ~ 6 週間を含めます。
    注: ピークが 40,000 秒以上の場合は、そのタスクが Amazon SQS キューに滞留しており、ワーカーでは過剰使用による障害が発生していることを示しています。また、システムによる強制終了が原因で、Celery ワーカーは障害をイベントバッファに書き込むことができません。

CloudWatch Insights を使用し、タスクが Amazon SQS キューに滞留しているときにアラートを出すこともできます。

アラートを作成するには、次の手順を実行します。

  1. CloudWatch コンソールを開きます。

  2. ナビゲーションペインで [ログ] を選択し、[Logs Insights] を選択します。

  3. 期間を 4 ~ 6 週間の範囲で指定します。

  4. [選択基準] メニューで MWAA 環境のスケジューラーロググループを選択します。

  5. クエリセクションに次のクエリを入力します。

    fields _@timestamp_, _@message_, _@logStream_, _@log_  
    | filter _@message_ like /Was the task terminated externally?/  
    | sort _@timestamp_ desc  
    | limit 10000

    以前にキューに入ったタスクを受信した際に、スケジューラーが送信するログの例を次に示します。

    [[34m**2024-01-17T11:30:18.936+0000**[0m] [34mscheduler_job_runner.py:[0m771 ERROR[0m - Executor reports task instance <TaskInstance: dag_name.task_name manual__202X-XX-XXTXX:XX:XX.758774+00:00 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task terminated externally?[0m

コンピューティングやメモリを大量に消費するワークロードを軽減する

注: 次のリストを綿密に考慮してください。すべての要因がすべてのユースケースに当てはまるわけではありません。追加のサポートが必要な場合は、AWS サポートにお問い合わせください。

環境内のコンピューティングまたはメモリを大量に消費するワークロードを減らすには、次の手順を実行します。

  • DAG コードに、抽出、変換、ロード (ETL) スクリプト、データ移動の指示、AI または ML パイプライン、その他のコンピューティングまたはメモリを大量に消費するワークロードが含まれていないことを確認してください。
  • DAG コードを記述する際、Apache Airflow の推奨事項に従ってください。最上位のコードを最小化し、必要なものだけをインポートするようにしてください。詳細については、Apache Airflow のウェブサイトで「ベストプラクティス」を参照してください。
  • DAG コードを最適化します。センサー、フック、カスタムオペレーター、拡張オペレーター、継承オペレーターのメモリフットプリントを分析し、潜在的な問題領域を特定します。

リソースの過剰使用が解消されない場合は、次の手順を実行します。

  • celery.worker_autoscale の値をデフォルト値設定よりも低くします。celery.worker_autoscale の値を数桁減らし、24 ~ 48 時間環境を監視します。最適なレベルに達するまで celery.worker_autoscale の値を継続的に減らします。
    注: celery.worker_autoscale の値を小さくすると、タスクプール全体が減少するため、長時間 Queue 状態に留まる項目数が増えます。これに対処するには、最低ワーカー数も増やす必要があります。
  • さらに、「環境が同時実行タスクの最大数に達したかどうかを確認する」セクションの手順を再度実行し、ワーカーあたりの同時タスク数を減らします。

関連情報

Amazon MWAA での Apache Airflow のパフォーマンスを調整する

設定リファレンス (Apache Airflow のウェブサイト)

AWS公式更新しました 2ヶ月前
コメントはありません

関連するコンテンツ