Dynamic task mapping (DTM) is a major feature that adds a lot of flexibility to how you build your DAGs. However, it does not provide infinite flexibility and break you free of being beholden to Airflow's patterns. For example, your task mappings are constrained by datatypes supported by XCom, namely Python dict and lists. I have not found a way to set a Kubernetes secret for the KubernetesPodOperator
dynamically, for instance.
Python allows you to pickle an object such as a Kubernetes secret, which converts an object such as this into a byte stream, and XCom does support pickled data, but I have not found a way to use this in conjunction with DTM.
Here are some useful tips and observations collected from our working with DTM at Redactics:
Working with Global Variables
Understand that values set inside task mapping functions are set at runtime after values are assigned to variables outside of these mapped functions. Therefore, you cannot, for example, assign a value to a global variable inside a task mapping and expect that this value will be available outside of these mapped functions:
secrets = []
@task()
def set_vars(**context):
global secrets
secrets.append(Secret('volume', "/secretpath/" + context["params"]["secretname"], context["params"]["secretname"]))
return secrets
@task()
def init_wf(secrets, **context):
print(secrets)
return "hello"
init_workflow = init_wf.partial().expand(
secrets=set_vars()
)
start_job = KubernetesPodOperator(
task_id="start-job",
image="postgres:12"
cmds=["uname"],
secrets=secrets ### this value is going to be null
)
init_workflow >> start_job
The secrets
value in the KubernetesPodOperator is going to be null because by the time start_job
is initialized, set_vars
has not run.
You Can Duplicate Your DAGs To Have Them Run Within a Different Context, e.g. a Different Schedule
Not the prettiest pattern, but you can duplicate your DAGs, for example with a Docker entrypoint script:
#!/bin/bash
arr=( "workflow1" "workflow2" )
for workflow_id in "${arr[@]}"
do
cp /tmp/dag-template.py /opt/airflow/dags/${workflow_id}-mydag.py
done
Then, your DAGs can pull a configuration from an API, environment variables, files, or whatever makes the most sense to serve up these variations to your DAG. This may seem obvious, and it certainly isn't pretty, but we had to bite this bullet because we needed certain parameters (e.g. schedule) accessible and this was not territory for DTM.
If you elect to retrieve some values via an API, this allows even more of the DAG to be dynamic so that it doesn't need updating whenever you want to change values. We elected to use that workflow_id
in the filename to pass on to the API:
dag_file = os.path.basename(__file__).split('.')[0]
dag_id = dag_file.replace('-mydag','')
API_KEY = os.environ['API_KEY']
API_HOST = "https://api.yourthing.com"
headers = {'Content-type': 'application/json', 'Accept': 'text/plain', 'x-api-key': API_KEY}
apiUrl = API_HOST + '/airflowconfigs/' + dag_id
request = requests.get(apiUrl, headers=headers)
wf_config = request.json()
Running this near the top of the DAG ensures that wf_config
is available as a global variable throughout your DAG. You can control how often your API is polled, and if you are concerned about how this scales, cache these configs with Redis.
Accessing the Context Object, Including DagRun Params, Requires the TaskFlow API
If you are using the Airflow REST API and passing in a conf object to the DAGRun endpoint, for example, you cannot access these arguments from within a classic
style operator such as PythonOperator
. Instead, you must use the TaskFlow API designed for usage with DTM. For example:
@task()
def start_job(**context):
print(context["params"]["myparam"])
Top comments (0)