MWAA 2.2.2 CLI Backfill skips a day

0

I am experiencing this issue where the backfill job skips a day and runs that day at the end on MWAA 2.2.2

For example: I am trying to run a backfill on a DAG dag_name. The dag has these parameters: depends_on_past=True, max_active_runs=1

I pass the below command as a JSON config to my separate backfill DAG.

airflow dags backfill -s 2022-05-01 -e 2022-05-10 dag_name

The backfill job is skipping the 2022-05-02 task and only runs it at the end. I was expecting Airflow would run it after the 2022-05-01 task. Basically, the backfill is not respecting the depends_on_past=True parameter which dag_name DAG has. I think another person reported the same issue https://github.com/apache/airflow/discussions/23808.

Enter image description here

Note: It runs as expected in the correct order when I let it backfill automatically with catchup=True. This only happens when someone tries to run the backfill manually with BashOperator.

已提問 2 年前檢視次數 263 次
1 個回答
0

Hello,  

The issue mentioned in https://github.com/apache/airflow/discussions/23808 seems to be a general issue with respect to Airflow and not particularly related to MWAA. However, I have used the below code to replicate the issue at my end and we could see that the code ran without any issue. Hence, request you to please reach out to support engineering to address your specific issue.

Below is the code for base DAG that will be triggered using the trigger DAG:

—————————————

""" Code that goes along with the Airflow located at: http://airflow.readthedocs.org/en/latest/tutorial.html """

from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta

default_args = { "owner": "airflow", "start_date": datetime(2022, 4, 1), "email": ["airflow@airflow.com"], "email_on_failure": False, "email_on_retry": False, "retries": 1, "depends_on_past": True, "retry_delay": timedelta(minutes=5) }

dag = DAG("tutorial_depend", default_args=default_args, catchup=False, schedule_interval='@daily')

t1 = BashOperator(task_id="print_date", bash_command="sleep 5", dag=dag)

t2 = BashOperator(task_id="sleep", bash_command="sleep 5", retries=3, dag=dag)

templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """

t3 = BashOperator( task_id="templated", bash_command=templated_command, params={"my_param": "Parameter I passed in"}, dag=dag, )

t2.set_upstream(t1) t3.set_upstream(t1)

—————————————

Trigger DAG code : The command was passed in the form of JSON { "command" : "airflow dags backfill -x -s '2022-04-02' -e '2022-04-12' --rerun-failed-tasks tutorial_depend"}

—————————————

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="any_bash_command_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="{{ dag_run.conf['command'] }}” )

—————————————

================  

Have a nice day!

AWS
支援工程師
Arun
已回答 2 年前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南