Passing params from an Apache Airflow DAG to triggered DAGs using TriggerDagRunOperator
Table of contents
So I was in this situation, struggling for like 5 hours yesterday (yes, the last 5 Friday work hours, the best ones to get stuck with some code) trying to pass parameters using the TriggerDagRunOperator, and wanting to die but at the end achieving it.
Maybe I was just not experienced enough and I fell into a really easy thing to fix but, today I'll show how to do it, so you don't have to struggle as I did 🙂 let's get into it.
Use Case
If you want to go straight to the solution you can skip this section.
I had 2 data sources, an ERP and one content environment (from now on I'll call it 'env') from a CMS (if you don't know what a CMS is, I explain a little bit about it in this post). I had 2 DAGs that run at the same time (with the same schedule_interval) and synced data from the ERP to the CMS. Each DAG syncs a specific type of data to the same env.
Until now, both DAGs were run individually, updating the CMS environment async. The sync process between the 2 data sources is not free of failures so, a new need come up, which was to first create a backup of the env and then sync the data to a new env that is a copy of the old one. If anything goes wrong, we can just switch the environment and delete the broken one.
With this, the 2 DAGs cannot run async anymore, they have to sync the data to the same environment. The proposed solution was to create a new DAG (which I'll call Wrapper from now on) that first runs this create-backup-env task and then triggers the 2 DAGs using the TriggerDagRunOperator. Also, these DAGs cannot be executed manually or with a scheduled interval anymore but the Wrapper DAG instead, the create-backup-env task has to always be run first for the 2 DAGs to always push data to the same env and don't push to old envs that will not be used anymore.
Furthermore, the 2 DAGs can receive quite many config parameters to execute or not certain tasks using the Trigger DAG w/config feature that Airflow provides, so these parameters have to be also available in the Wrapper DAG.
The Solution
FYI - I simplified the solution a lot but always kept the main components untouched.
To use the TriggerDagRunOperator, we need to define something like this:
# Wrapper DAG
from airflow.decorators import task, dag
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import get_current_context
from airflow.utils.state import State
from datetime import datetime
@dag(start_date=datetime(2023, 1, 7), schedule_interval='@daily', catchup=False)
def wrapper_dag():
@task.python
def create_backup_env():
print('Creating backup env...')
trigger_sync_dag_1_task = TriggerDagRunOperator(
task_id='trigger_sync_dag_1',
trigger_dag_id='sync_dag',
wait_for_completion=True,
poke_interval=60,
failed_states=[State.FAILED],
)
trigger_sync_dag_2_task = TriggerDagRunOperator(...)
@task.python
def other_task():
context = get_current_context()
params = context['params'] # Access to context params
print(params['message'])
create_backup_env() >> [trigger_sync_dag_1_task, trigger_sync_dag_2_task] >> other_task()
wrapper_dag()
# Sync DAG (let's assume we have 2 like this that are pretty similar)
from airflow.decorators import task, dag
from airflow.operators.python import get_current_context
from datetime import datetime
import logging
@dag(start_date=datetime(2023, 1, 7), schedule_interval='@daily', catchup=False)
def sync_dag():
@task.python
def sync():
logging.info('Syncing data...')
# Access to context params in order to perform certain tasks
context = get_current_context()
params = context['params']
logging.debug(f'params: {params}')
if 'run-task-a' in params and params['run-task-a']:
logging.info('Running task A...')
elif 'run-task-b' in params and params['run-task-b']:
logging.info('Running task B...')
sync()
sync_dag()
To access the params object passed to a DAG using the Trigger DAG w/config Airflow feature, we can use the params key inside the context that we retrieve using the get_current_context function. This returns the active DAG run context. We also can use the Jinja template interpolation feature that Airflow provides out of the box. That is using a string like {{ params }}
in certain operator-templated fields or properties. (For a deeper insight check the official documentation).
The TriggerDagRunOperator supports a field called conf that can receive a python dictionary that will be used as the triggered DAG config. It also supports templating, which means we can do the following:
trigger_dag_task = TriggerDagRunOperator(
task_id='trigger_dag',
trigger_dag_id='triggered_dag',
conf='{{ params }}',
# conf='{{ conf }}' also this to pass the DAG conf object
wait_for_completion=True,
poke_interval=60,
failed_states=[State.FAILED],
)
As I mentioned, the conf parameter expects a python dictionary. If we don't pass any config object to the Wrapper DAG it will work though, due to it will interpolate the params object (which is None), not resulting in any error. However, if we pass some parameters (for instance, {"run-task-a": true}
) will result in the following error in the TriggerDagRunOperator task instance:
So we have to rewrite our conf param:
trigger_dag_task = TriggerDagRunOperator(
task_id='trigger_dag',
trigger_dag_id='triggered_dag',
# You can use whichever key you want. I used 'configuration'.
conf={'configuration': '{{ params }}'},
wait_for_completion=True,
poke_interval=60,
failed_states=[State.FAILED],
)
Doing this, we have the following context['params']
object available in our triggered DAGs: {'configuration': "{'run-task-a': True}"}
.
We have 2 problems here. As you can imagine, the 2 Sync DAGs were built using context['params']
instead of context['params']['configuration']
. Furthermore, we're receiving a string with the python dictionary instead of the dictionary.
To handle this, we'll need to modify our sync DAGs a little bit. We can create a get_context_params util function:
# dags/utils/common.py
from ast import literal_eval
from airflow.operators.python import get_current_context
def get_context_params():
context = get_current_context()
params = context['params']
if 'configuration' in params:
params = {
**params,
**literal_eval(params['configuration'])
}
del params['configuration']
return params
Here we're checking if the params
object has a configuration
property, if so, we spread the value in the first params
object level as a python dictionary using the literal_eval function from the ast package. This function evaluates a string containing a Python literal, for instance, a Python dictionary. You can click here to visit the official docs and have a deeper insight into it.
Ultimately, our Sync DAG has to be rewritten as follows:
# Sync DAG (let's assume we have 2 like this that are pretty similar)
from airflow.decorators import task, dag
from datetime import datetime
import logging
from dags.utils.common import get_context_params
@dag(start_date=datetime(2023, 1, 7), schedule_interval='@daily', catchup=False)
def sync_dag():
@task.python
def sync():
logging.info('Syncing data...')
# Access to context params in order to perform certain tasks
params = get_context_params()
logging.debug(params)
if 'run-task-a' in params and params['run-task-a']:
logging.info('Running task A...')
elif 'run-task-b' in params and params['run-task-b']:
logging.info('Running task B...')
sync()
sync_dag()
Now if we run the Wrapper DAG passing the following config object:
{"run-task-a": true}
We'll get the following result in the sync task logs:
With this, we're able to pass params from a parent DAG to a triggered DAG without the need of changing too much logic to use the context params (:
If you think I overcomplicated the solution (it's probably the case) I encourage you to leave a comment ^^ then all of us can continue learning (:
You can check the source code here. It includes some extra stuff like using the BranchPythonOperator to skip the syncs depending on more config parameters.
Thanks for reading!