By using AWS re:Post, you agree to the Terms of Use

MWAA 2.2.2 CLI Backfill skips a day

0

I am experiencing this issue where the backfill job skips a day and runs that day at the end on MWAA 2.2.2

For example: I am trying to run a backfill on a DAG dag_name. The dag has these parameters: depends_on_past=True, max_active_runs=1

I pass the below command as a JSON config to my separate backfill DAG.

airflow dags backfill -s 2022-05-01 -e 2022-05-10 dag_name

The backfill job is skipping the 2022-05-02 task and only runs it at the end. I was expecting Airflow would run it after the 2022-05-01 task. Basically, the backfill is not respecting the depends_on_past=True parameter which dag_name DAG has. I think another person reported the same issue https://github.com/apache/airflow/discussions/23808.

Enter image description here

Note: It runs as expected in the correct order when I let it backfill automatically with catchup=True. This only happens when someone tries to run the backfill manually with BashOperator.

1 Answer
0

Hello,  

The issue mentioned in https://github.com/apache/airflow/discussions/23808 seems to be a general issue with respect to Airflow and not particularly related to MWAA. However, I have used the below code to replicate the issue at my end and we could see that the code ran without any issue. Hence, request you to please reach out to support engineering to address your specific issue.

Below is the code for base DAG that will be triggered using the trigger DAG:

—————————————

""" Code that goes along with the Airflow located at: http://airflow.readthedocs.org/en/latest/tutorial.html """

from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta

default_args = { "owner": "airflow", "start_date": datetime(2022, 4, 1), "email": ["airflow@airflow.com"], "email_on_failure": False, "email_on_retry": False, "retries": 1, "depends_on_past": True, "retry_delay": timedelta(minutes=5) }

dag = DAG("tutorial_depend", default_args=default_args, catchup=False, schedule_interval='@daily')

t1 = BashOperator(task_id="print_date", bash_command="sleep 5", dag=dag)

t2 = BashOperator(task_id="sleep", bash_command="sleep 5", retries=3, dag=dag)

templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """

t3 = BashOperator( task_id="templated", bash_command=templated_command, params={"my_param": "Parameter I passed in"}, dag=dag, )

t2.set_upstream(t1) t3.set_upstream(t1)

—————————————

Trigger DAG code : The command was passed in the form of JSON { "command" : "airflow dags backfill -x -s '2022-04-02' -e '2022-04-12' --rerun-failed-tasks tutorial_depend"}

—————————————

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="any_bash_command_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="{{ dag_run.conf['command'] }}” )

—————————————

================  

Have a nice day!

SUPPORT ENGINEER
answered 3 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions