MWAA 2.2.2 CLI Backfill skips a day

0

I am experiencing this issue where the backfill job skips a day and runs that day at the end on MWAA 2.2.2

For example: I am trying to run a backfill on a DAG dag_name. The dag has these parameters: depends_on_past=True, max_active_runs=1

I pass the below command as a JSON config to my separate backfill DAG.

airflow dags backfill -s 2022-05-01 -e 2022-05-10 dag_name

The backfill job is skipping the 2022-05-02 task and only runs it at the end. I was expecting Airflow would run it after the 2022-05-01 task. Basically, the backfill is not respecting the depends_on_past=True parameter which dag_name DAG has. I think another person reported the same issue https://github.com/apache/airflow/discussions/23808.

Enter image description here

Note: It runs as expected in the correct order when I let it backfill automatically with catchup=True. This only happens when someone tries to run the backfill manually with BashOperator.

1개 답변
0

Hello,  

The issue mentioned in https://github.com/apache/airflow/discussions/23808 seems to be a general issue with respect to Airflow and not particularly related to MWAA. However, I have used the below code to replicate the issue at my end and we could see that the code ran without any issue. Hence, request you to please reach out to support engineering to address your specific issue.

Below is the code for base DAG that will be triggered using the trigger DAG:

—————————————

""" Code that goes along with the Airflow located at: http://airflow.readthedocs.org/en/latest/tutorial.html """

from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta

default_args = { "owner": "airflow", "start_date": datetime(2022, 4, 1), "email": ["airflow@airflow.com"], "email_on_failure": False, "email_on_retry": False, "retries": 1, "depends_on_past": True, "retry_delay": timedelta(minutes=5) }

dag = DAG("tutorial_depend", default_args=default_args, catchup=False, schedule_interval='@daily')

t1 = BashOperator(task_id="print_date", bash_command="sleep 5", dag=dag)

t2 = BashOperator(task_id="sleep", bash_command="sleep 5", retries=3, dag=dag)

templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """

t3 = BashOperator( task_id="templated", bash_command=templated_command, params={"my_param": "Parameter I passed in"}, dag=dag, )

t2.set_upstream(t1) t3.set_upstream(t1)

—————————————

Trigger DAG code : The command was passed in the form of JSON { "command" : "airflow dags backfill -x -s '2022-04-02' -e '2022-04-12' --rerun-failed-tasks tutorial_depend"}

—————————————

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="any_bash_command_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="{{ dag_run.conf['command'] }}” )

—————————————

================  

Have a nice day!

AWS
지원 엔지니어
Arun
답변함 2년 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠