DEV Community

Joe Auty
Joe Auty

Posted on

How To Get the Most Out of Airflow's Dynamic Task Mapping

Dynamic task mapping (DTM) is a major feature that adds a lot of flexibility to how you build your DAGs. However, it does not provide infinite flexibility and break you free of being beholden to Airflow's patterns. For example, your task mappings are constrained by datatypes supported by XCom, namely Python dict and lists. I have not found a way to set a Kubernetes secret for the KubernetesPodOperator dynamically, for instance.

Python allows you to pickle an object such as a Kubernetes secret, which converts an object such as this into a byte stream, and XCom does support pickled data, but I have not found a way to use this in conjunction with DTM.

Here are some useful tips and observations collected from our working with DTM at Redactics:

Working with Global Variables

Understand that values set inside task mapping functions are set at runtime after values are assigned to variables outside of these mapped functions. Therefore, you cannot, for example, assign a value to a global variable inside a task mapping and expect that this value will be available outside of these mapped functions:

secrets = []

@task()
def set_vars(**context):
    global secrets
    secrets.append(Secret('volume', "/secretpath/" + context["params"]["secretname"], context["params"]["secretname"]))
    return secrets

@task()
def init_wf(secrets, **context):
    print(secrets)
    return "hello"

init_workflow = init_wf.partial().expand(
    secrets=set_vars()
)

start_job = KubernetesPodOperator(
    task_id="start-job",
    image="postgres:12"
    cmds=["uname"],
    secrets=secrets ### this value is going to be null
    )
init_workflow >> start_job
Enter fullscreen mode Exit fullscreen mode

The secrets value in the KubernetesPodOperator is going to be null because by the time start_job is initialized, set_vars has not run.

You Can Duplicate Your DAGs To Have Them Run Within a Different Context, e.g. a Different Schedule

Not the prettiest pattern, but you can duplicate your DAGs, for example with a Docker entrypoint script:

#!/bin/bash

arr=( "workflow1" "workflow2" )

for workflow_id in "${arr[@]}"
do
    cp /tmp/dag-template.py /opt/airflow/dags/${workflow_id}-mydag.py
done
Enter fullscreen mode Exit fullscreen mode

Then, your DAGs can pull a configuration from an API, environment variables, files, or whatever makes the most sense to serve up these variations to your DAG. This may seem obvious, and it certainly isn't pretty, but we had to bite this bullet because we needed certain parameters (e.g. schedule) accessible and this was not territory for DTM.

If you elect to retrieve some values via an API, this allows even more of the DAG to be dynamic so that it doesn't need updating whenever you want to change values. We elected to use that workflow_id in the filename to pass on to the API:

dag_file = os.path.basename(__file__).split('.')[0]
dag_id = dag_file.replace('-mydag','')

API_KEY = os.environ['API_KEY']
API_HOST = "https://api.yourthing.com"
headers = {'Content-type': 'application/json', 'Accept': 'text/plain', 'x-api-key': API_KEY}
apiUrl = API_HOST + '/airflowconfigs/' + dag_id
request = requests.get(apiUrl, headers=headers)
wf_config = request.json()
Enter fullscreen mode Exit fullscreen mode

Running this near the top of the DAG ensures that wf_config is available as a global variable throughout your DAG. You can control how often your API is polled, and if you are concerned about how this scales, cache these configs with Redis.

Accessing the Context Object, Including DagRun Params, Requires the TaskFlow API

If you are using the Airflow REST API and passing in a conf object to the DAGRun endpoint, for example, you cannot access these arguments from within a classic style operator such as PythonOperator. Instead, you must use the TaskFlow API designed for usage with DTM. For example:

@task()
def start_job(**context):
    print(context["params"]["myparam"])
Enter fullscreen mode Exit fullscreen mode

Top comments (0)