In this blog post, we are going to take a look at how we can setup Apache Airflow on our systems and get you as a developer, started off with just the bare minimum so you can start working on it.
For detailed documentation please always refer the Airflow official Documentation
Introduction to Airflow
Airflow is a workflow management platform for data engineering pipelines. It helps define workflows with python code and provides a rich UI to manage and monitor these workflows. Airflow supports easy integration with all popular external interfaces like DBs(SQL and MongoDB), SSH, FTP, Cloud providers etc. It sounds like it does a lot and it does, and can be intimidating, but it is really easy to get started.
The biggest plus point about Airflow is its user friendly UI.
The Most Common Terms used in Airflow
- DAG (Directed acyclic graph) A DAG is a python(.py) file that defines what the steps are in our workflow. Each DAG has some common configuration and details of what task needs to be done at each step. Note: You do not need to know advanced python to start working on DAGs but should know the basics.
- Tasks - Each step of our workflow is called a Task. We can provide different relationship/dependencies between tasks. Example: Task2 should run after Task1. And Task3 should run after Task2. Task4 should run if Task1, Task2 and Task3 are successful only. Task5 should run if either Task1, Task2 or Task3 fail.
- Operators - Now tasks are executed using certain Airflow operators. There are around 50 Operators defined. Each operator has configuration that can be done to suit our requirement. We can even call python functions and use the output 2 or a bash operator and execute a specific command.
- Connections & Hooks - There is a layer between the code and the connection details. Airflow already has integration with many external interfaces so you do not need write low level code. For example if you need a DB connection, in the Airflow UI you will create a connection with the host name, port, username and password of the connection and provide a conn_id(connection id). And in your DAG(Python code) you will ONLY use the conn_id as a reference. This segregates your code from the connection configuration and makes it reusable across environments as well.
- XComs - XCom(Cross Communication) is basically the data that flows between Task1 and Task2. Note: the data passed between tasks should be very minimal and we should not pass large objects.
Installing Airflow with Docker Compose
Now that we have seen the popular concepts, let us get airflow up and running on our local machine.
You can follow the official documentation for installation
I will go through a very simple setup here.
Prerequisite: docker and docker compose should already be installed.
Download the Airflow docker compose file created by the Airflow team:
Save it on your local machine in a folder of your choice and run
docker-compose up
You should see the various images being downloaded(takes a lot of time the very first you run the command) and you will see containers starting up.
After a while all your containers will be up.
To check this open a new terminal in parallel and run
docker container ls
You should see 7 containers running
Now go to localhost:8080 and you have the below page opening up
Username: airflow
Password: airflow
This will open up your home screen
Explore features of Airflow
Let us now see the various features of Airflow.
Home Screen
Our Home Screen shows
Your home screen is auto loaded with a bunch of pre-defined DAGs. This is for your reference and you to play around with when creating your own DAG!!
DAG Views
Let's look at the first DAG - example bash operator
Tree View
Click on the DAG to bring up the details
This brings up the Tree view and shows how the steps are linked in a tree like structure.
Make sure that you Activate the DAG by clicking the switch button on the top right. This can be done for the home page as well if you noticed.
Once you click it, you can click on the Run button on the top right to execute the DAG. On clicking Run, choose 'Run DAG' option. The Run DAG w/ config option is if you have some custom configuration for the DAG which is not required in this case.
Congrats, you've run your very first DAG.
Code View
A lot of the different views are for monitoring purposes once the code is deployed. To see the code itself click on the last view the Code View.
The airflow team has given us a couple of DAGs pre built and we can now check what is written.
Below is the code for this particular DAG
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of the BashOperator."""
import datetime
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
with DAG(
dag_id='example_bash_operator',
schedule_interval='0 0 * * *',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
tags=['example', 'example2'],
params={"example_key": "example_value"},
) as dag:
run_this_last = DummyOperator(
task_id='run_this_last',
)
# [START howto_operator_bash]
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
)
# [END howto_operator_bash]
run_this >> run_this_last
for i in range(3):
task = BashOperator(
task_id='runme_' + str(i),
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
)
task >> run_this
# [START howto_operator_bash_template]
also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
)
# [END howto_operator_bash_template]
also_run_this >> run_this_last
# [START howto_operator_bash_skip]
this_will_skip = BashOperator(
task_id='this_will_skip',
bash_command='echo "hello world"; exit 99;',
dag=dag,
)
# [END howto_operator_bash_skip]
this_will_skip >> run_this_last
if __name__ == "__main__":
dag.cli()
Well you do need to know a bit of Python to understand the syntax and coding. But just the basics is more than enough to get started with DAGs.
Note: You CANNOT change the DAG in the UI directly. You need to do modify the Python file containing the DAG code and place it in the appropriate folder to do so
Graph View
Another important view is the graph view
By clicking one of the steps, you will get a popup which gives you many step level options
Probably the most important is the Log Option. This will give you all the details of the logs on the machine when the particular step was executed
Make sure you go through the other views and see what can be used for your project.
DAGS Active Filter
On navigating back to the home screen you will notice that there are 3 tabs on the top.
Always remember that even if you try and run a DAG, it won't run until it is 'Active'
Admin
Another important place to keep an eye out for is the Admin dropdown.
Lets look at 3 of these options
Variables
Variables allow you to have global level variables used across the application
Connections
Almost always you will want your airflow workflow to talk to some external service. Connections provides an interface between the actual connection string/details and the code you need to write. In the code you can refer the connection name and the details of connection can vary in each environment of airflow. This can even be a HTTP Connection for a REST API Call.
XComs
You can view the data passed between steps in your DAG.
It provides the DAG id and task id and you can easily debug runs here instead of checking the logs of multiple runs.
With these views you probably get the most important features of Airflow from a developers point of view.
Creating and Running our own DAG
Now, lets create a simple simple DAG and run it.
Create the .py file
On navigating back to your docker-compose folder. You should see 3 folders automatically created. dags, logs and plugins.
To add a DAG to the airflow instace, create a .py file ib the dags folder. Below is a sample file
from airflow import DAG
from datetime import datetime
from airflow.operators.http_operator import SimpleHttpOperator
with DAG("http_test_dag", start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False) as dag:
http_call = SimpleHttpOperator(
task_id="HttpCall",
http_conn_id='springboot-http',
endpoint='',
method='GET',
headers={"Content-Type": "application/json"},
data=None,
log_response=True,
response_check=lambda response: response.ok)
http_call
This DAG uses a simple Http operator that makes a call to the connection - 'springboot-http'. This is a connection that is expected to be generated in the admin>connections.
Updating Airflow
Airflow automatically will pick up your new DAG file if placed in the folder. You do not need to do anything to deploy.
A common mistake is looking at the Code tab of a deployed DAG and refreshing and expecting to see your new code but it doesn't get updated. You might be inclined to think something is wrong with airflow and you need to stop and restart the docker containers. But the issue is with the modifications you have made in the DAG. If the code doesn't compile, the code in the code tab won't get updated.
Now you can activate the DAG and run it!!
I hope this post is helpful for anyone trying to learn about Airflow and just get quickly started so you can play around with it.
Top comments (0)