[MWAA] How do I trigger multiple dag runs with different inputs and at different schedules from a single dag file?

0

Background:

I am trying to build a system with MWAA to pull data from various databases at different schedules. Eg:

  • workFlow1 pulls data from mysqldb1 every hour
  • workFlow2 pulls data from mysqldb2 every 15 mins
  • workFlow3 pulls data from snowflakedb1 every 24 hrs and so on...

Now, I can create dag files for each of these workflows and let MWAA schedule and run them. But this approach will eventually run into the capping on max. no of dags for my environment

Questions:

  • What will happen if I upload more dags than the environment's limit?
  • Is there a better way to implement the above? Eg: I want to define a separate dag script for each of mysql, postgres, snowflake, etc. and want the scheduler to trigger each of the above workflows with different schedules and with different input sets.
Abhi
asked 4 months ago362 views
2 Answers
1

Hello

Thank you for your post.

In my experience, there is no way that you can define the scheduler for individual TASKS inside a DAG. You can do this as mentioned define a separate dag script for each of mysql, postgres, snowflake, etc. and want the scheduler to trigger each of the above workflows with different schedules and with different input sets.

And this with single DAG file, the mentioned use case isn't possible because of explanation of DAGs and TASKs in Airfow as per the docs here -

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html

where you can define the schedule in DAG definition only.

What will happen if I upload more dags than the environment's limit?

Considering you are trying to understand statement from the doc of Up to 50 DAG capacity in m1.small and you were asking to put more than 50 DAGs in a m1.small environment.

With regards to the environment capacity, the estimate shown on the documentation is more of a guideline on expected capacity considerations, when deploying DAGs. These estimations are based on lightweight tasks and should be considered as reference points, and not absolute values. Airflow tasks running on MWAA are executed within containers that run Python code, and the performance of tasks on the environment depend primarily on the computation and memory available to the workers and scheduler. This information is also outlined in the Airflow Best Practices. A smaller environment will have workers with less memory and processing power, and as such they will not be able to run as many DAGs (or tasks) as a larger environment. It is important to consider the guideline as a rule of thumb, as not all tasks in DAGs will require the same amount of memory and processing (some DAGs and by extension tasks, will need more resource usage than others). Therefore, it's essential to consider the complexity of your particular tasks to determine the expected number of tasks that would be applicable to your environment. As the number of tasks per DAG would depend on your use case, you would need to do a benchmark test to find out the most accurate number of tasks per DAG that can be run for your particular use case.

max_dagruns_per_loop_to_schedule

Airflow configurations like this is configurable in MWAA, you can put the MWAA configuration override scheduler.max_dagruns_per_loop_to_schedule if needed. Note, to update the environment in downtime and it takes 20-30 minutes to update.

AWS
answered 4 months ago
profile pictureAWS
SUPPORT ENGINEER
reviewed 3 months ago
0

Hi Abhi, The above answer already provides a lot of details. Nevertheless, I'm adding some more information to your questions:

What will happen if I upload more dags than the environment's limit?

More python files in DAG folder would mean more work for the DAG processing manager, so could put some resource strain on the Scheduler(s). In terms of number of DAG objects, a higher number doesn't necessarily mean it'll exceed the environment's capacity. Instead, it'd depend on the number of tasks, what those tasks do, scheduling pattern and so on.

Is there a better way to implement the above? Eg: I want to define a separate dag script for each of mysql, postgres, snowflake, etc. and want the scheduler to trigger each of the above workflows with different schedules and with different input sets.

You might already be aware of it, however it sounds like your use case would be better served with Airflow's Dynamic DAG generation. With this, you can have a single python file that programmatically generates multiple DAG objects with different definition (different schedule interval, tasks, params and so on).

profile pictureAWS
EXPERT
answered 3 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions