I recently contribute to an open-source project called DAG Factory, library for building Airflow DAGs declaratively using YAML files, eliminating the need for Python coding.
My contribution was to add support for the Data-aware Scheduling (Datasets) functionality of Airflow, which was introduced starting from version 2.4 (at the time of writing this article, Airflow is at version 2.6.3).
The aim here is to talk about the Datasets functionality in Airflow, introduce the DAG Factory library, and create a practical example using both.
You can access the repository with the code used in this article here.
What are Datasets in Airflow?
Data-aware Scheduling allows creating DAGs linked to files, whether local or remote, to trigger data processing based on the modification of one or multiple files, known as datasets.
Datasets help resolve the problem of data dependency between DAGs, which occurs when one DAG needs to consume data from another for analysis or further processing. They enable a more intelligent and visible scheduling with an explicit dependency between DAGs.
Basically, there are two fundamental concepts in Airflow's Datasets:
DAG Producer: this DAG creates or updates one or more datasets, accomplished through tasks using a parameter called outlets, to specify a particular dataset.
DAG Consumer: this DAG consumes one or more datasets and will be scheduled and triggered as soon as all datasets are successfully created or updated by the DAG Producer. The scheduling is done using the schedule directly in the DAG configuration.
Currently, there are two ways to schedule DAGs in Airflow: either by a recurrent schedule (cron, timedelta, timetable, etc.) or through one or multiple datasets. It's important to note that we cannot use both scheduling methods in a single DAG, only one in each DAG.
DAG Factory: Building DAGs with YAML
DAG Factory is a community library that allows configuring Airflow to generate DAGs from one or multiple YAML files.
The library aims to facilitate the creation and configuration of new DAGs by using declarative parameters in YAML. It allows default customizations and is open-source, making it easy to create and customize new functionalities.
The community around this library is highly engaged, making it worth exploring =)
Practical Use of Datasets
In this article, we'll work with the following scenario:
We need to build a pipeline that downloads data from an API and saves the results to Amazon S3. After successfully extracting and saving the data, we need to process it. Hence, we'll have another pipeline that will be triggered based on the create or update of the data.
The infrastructure to run Airflow and reproduce the example in this article can be found here.
The first step is to build the pipeline that extracts and saves the data to S3.
Producer DAG for Data
This pipeline consists of two tasks to extract data from the public PokeAPI and another two tasks to save the data to S3.
The tasks that extract data from the API using the SimpleHttpOperator, and the tasks that save the data to S3 use the S3CreateObjectOperator.
Since we'll be using YAML to build our DAGs, the following code constructs this first DAG with all its tasks.
download_data_api_dataset_producer_dag:
description: "Example DAG producer custom config datasets"
schedule_interval: "0 5 * * *"
task_groups:
extract_data:
tooltip: "this is a task group"
save_data:
tooltip: "this is a task group"
tasks:
start_process:
operator: airflow.operators.dummy.DummyOperator
get_items_data:
operator: airflow.providers.http.operators.http.SimpleHttpOperator
method: "GET"
http_conn_id: "poke_api"
endpoint: "item/1"
task_group_name: extract_data
dependencies: [start_process]
save_items_data:
operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator
aws_conn_id: aws_default
s3_bucket: cjmm-datalake-raw
s3_key: "poke_api/item/data_{{ ts }}.json"
data: "{{ ti.xcom_pull(task_ids='get_items_data') }}"
dependencies: [get_items_data]
task_group_name: save_data
outlets:
file: /opt/airflow/dags/dags_config/datasets_config.yml
datasets: ['dataset_poke_items']
get_items_attribute_data:
operator: airflow.providers.http.operators.http.SimpleHttpOperator
method: "GET"
http_conn_id: "poke_api"
endpoint: "item-attribute/1"
dependencies: [start_process]
task_group_name: extract_data
save_items_attribute_data:
operator: airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator
aws_conn_id: aws_default
s3_bucket: cjmm-datalake-raw
s3_key: "poke_api/items_attribute/data_{{ ts }}.json"
data: "{{ ti.xcom_pull(task_ids='get_items_attribute_data') }}"
dependencies: [get_items_attribute_data]
task_group_name: save_data
outlets:
file: /opt/airflow/dags/dags_config/datasets_config.yml
datasets: ['dataset_poke_items_attribute']
A highlight is the configuration of the Datasets, done through the outlets tag added to the tasks save_items_data and save_items_attribute_data.
outlets:
file: /opt/airflow/dags/dags_config/datasets_config.yml
datasets: ['dataset_poke_items_attribute']
In this configuration, we specify the path of the file, where all Datasets are centrally declared for reuse, and the names of the datasets contained in the file for use.
Below is the datasets_config.yml file used in this example, containing the Dataset's name (used only in Airflow) and the URI, which is the path where the current file is stored, in this case, Amazon S3.
datasets:
- name: dataset_poke_items_attribute
uri: s3://cjmm-datalake-raw/poke_api/items_attribute/*.json
- name: dataset_poke_items
uri: s3://cjmm-datalake-raw/poke_api/items/*.json
The resulting DAG visualization in Airflow will look like this:
Consumer DAG for Data
Now, let's build the DAG that consumes the data, which performs the processing and handling of the datasets.
The DAG is scheduled based on datasets, not on an execution time, so it will only be triggered when all the datasets it depends on are updated.
Currently, we cannot use two types of scheduling simultaneously; it's either through a schedule interval or datasets.
In this example, we'll only build a DAG with PythonOperator, simulating the consumption and processing of the produced data.
Below is the configuration file for the consumer DAG:
process_data_api_dataset_consumer_dag:
description: "Example DAG consumer custom config datasets"
schedule:
file: /opt/airflow/dags/dags_config/datasets_config.yml
datasets: ['dataset_poke_items', 'dataset_poke_items_attribute']
tasks:
start_process:
operator: airflow.operators.dummy.DummyOperator
process_data:
operator: airflow.operators.python_operator.PythonOperator
python_callable_name: process_data_function
python_callable_file: /opt/airflow/dags/process_data.py
task_group_name: etl_data
provide_context: true
dependencies: [start_process]
A highlight is the configuration of the schedule based on datasets, which is similar to the configuration of the outlets in the producer DAG:
schedule:
file: /opt/airflow/dags/dags_config/datasets_config.yml
datasets: ['dataset_poke_items', 'dataset_poke_items_attribute']
The resulting DAG visualization in Airflow will be as follows:
Overview of DAGs with Datasets
When we have DAGs using Airflow's datasets, we can observe some interesting points:
- The consumer DAG in the list of all DAGs is flagged to indicate scheduling based on datasets.
- There is a specific visualization in the Airflow menu called Datasets, you can check the configured datasets, the dependencies between DAGs, and the log of dataset creation, update, and consumption.
- The DAG Dependencies visualization shows the relationships between the DAGs, providing a helpful overview of the processing mesh and data dependencies.
Important Points about Datasets
The functionality of Datasets in Airflow is still recent, and there are many improvements in the community backlog. However, I would like to highlight some points at this moment:
Currently, Airflow's dataset functionality does not directly inspect the physical file itself. Instead, it schedules the consumer pipeline directly through the database, almost like an implicit DAG Trigger.
Considering the previous point, it's better to use a Sensor if you genuinely need to "see and access" the data when triggering the DAG Consumer.
The official documentation does not recommend using regular expressions in the URI of datasets. However, in my tests, I didn't encounter any issues with this, as the functionality doesn't yet look directly at the physical file.
Since the DAG Consumer doesn't have a specific schedule, it's challenging to measure if it was triggered at a planned time, making it difficult to define an SLA. A more refined monitoring approach is needed to avoid missing critical scheduling.
Conclusion
By using the DAG Factory library, we simplify the process of creating and configuring new DAGs, leveraging the extensibility provided by the library's open-source code.
Airflow's Datasets enable more efficient scheduling by triggering DAGs only when necessary data is available, avoiding unnecessary and delayed executions.
I hope this article has been useful in understanding Airflow's Datasets functionality and how to apply it to your projects. With this approach, you can build more robust and efficient pipelines, fully utilizing Airflow's potential.
Follow me:
LinkedIn: https://www.linkedin.com/in/cicero-moura/
Github: https://github.com/cicerojmm
Top comments (0)