MWAA 2.2.2 CLI Backfill skips a day

0

I am experiencing this issue where the backfill job skips a day and runs that day at the end on MWAA 2.2.2

For example: I am trying to run a backfill on a DAG dag_name. The dag has these parameters: depends_on_past=True, max_active_runs=1

I pass the below command as a JSON config to my separate backfill DAG.

airflow dags backfill -s 2022-05-01 -e 2022-05-10 dag_name

The backfill job is skipping the 2022-05-02 task and only runs it at the end. I was expecting Airflow would run it after the 2022-05-01 task. Basically, the backfill is not respecting the depends_on_past=True parameter which dag_name DAG has. I think another person reported the same issue https://github.com/apache/airflow/discussions/23808.

Enter image description here

Note: It runs as expected in the correct order when I let it backfill automatically with catchup=True. This only happens when someone tries to run the backfill manually with BashOperator.

demandé il y a 2 ans263 vues
1 réponse
0

Hello,  

The issue mentioned in https://github.com/apache/airflow/discussions/23808 seems to be a general issue with respect to Airflow and not particularly related to MWAA. However, I have used the below code to replicate the issue at my end and we could see that the code ran without any issue. Hence, request you to please reach out to support engineering to address your specific issue.

Below is the code for base DAG that will be triggered using the trigger DAG:

—————————————

""" Code that goes along with the Airflow located at: http://airflow.readthedocs.org/en/latest/tutorial.html """

from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta

default_args = { "owner": "airflow", "start_date": datetime(2022, 4, 1), "email": ["airflow@airflow.com"], "email_on_failure": False, "email_on_retry": False, "retries": 1, "depends_on_past": True, "retry_delay": timedelta(minutes=5) }

dag = DAG("tutorial_depend", default_args=default_args, catchup=False, schedule_interval='@daily')

t1 = BashOperator(task_id="print_date", bash_command="sleep 5", dag=dag)

t2 = BashOperator(task_id="sleep", bash_command="sleep 5", retries=3, dag=dag)

templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """

t3 = BashOperator( task_id="templated", bash_command=templated_command, params={"my_param": "Parameter I passed in"}, dag=dag, )

t2.set_upstream(t1) t3.set_upstream(t1)

—————————————

Trigger DAG code : The command was passed in the form of JSON { "command" : "airflow dags backfill -x -s '2022-04-02' -e '2022-04-12' --rerun-failed-tasks tutorial_depend"}

—————————————

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="any_bash_command_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="{{ dag_run.conf['command'] }}” )

—————————————

================  

Have a nice day!

AWS
INGÉNIEUR EN ASSISTANCE TECHNIQUE
Arun
répondu il y a 2 ans

Vous n'êtes pas connecté. Se connecter pour publier une réponse.

Une bonne réponse répond clairement à la question, contient des commentaires constructifs et encourage le développement professionnel de la personne qui pose la question.

Instructions pour répondre aux questions