DEV Community

Cover image for Airflow : create your first airflow DAG using tweepy
James
James

Posted on • Edited on

Airflow : create your first airflow DAG using tweepy

What is Apache airflow?
A data pipeline is a series of data processing tasks that must run between the source system and the target system to automate data movement and transformation.

Apache Airflow is a batch oriented tool for data pipeline generation. It is used to programmatically create, schedule, and monitor data pipelines commonly known as workflow orchestration. Airflow is an open source platform used to manage various tasks related to data processing in a data pipeline.

How does Apache airflow work?
A data pipeline in airflow is written using a Direct Acyclic Graph (DAG) in the Python Programming Language. By drawing data pipelines as graphs, airflow explicitly defines dependencies between tasks. In DAGs, tasks are displayed as nodes, whereas dependencies between tasks are illustrated using direct edges between different task nodes.

dag illustration

The direction of the edges depicts the direction of the dependencies, with an edge pointing from one task to another. Indicate which task must be completed before moving on to the next.
A DAG is defined in Apache airflow using Python code. The Python file describes the correlated DAG's structure. As a result, each DAG file typically describes the various types of tasks for a given DAG, as well as the dependencies of the various tasks. Apache Airflow parses these to create the DAG structure. Furthermore, DAGs Airflow files include additional metadata that instructs airflow when and how to execute the files.

The benefit of defining Airflow DAGs with Python code is that the programmatic approach gives users a lot of flexibility when building pipelines. Users, for example, can use Python code to generate a dynamic pipeline based on certain conditions. The adaptability allows for great workflow customization, allowing users to tailor Airflow to their specific requirements.

Scheduling and executing data pipelines in apache airflow
After defining the structure of a data pipeline as DAGs, Apache Airflow allows the user to specify a scheduled interval for each DAG. The schedule dictates when Airflow runs a pipeline. As a result, users can instruct Airflow to run every week, day, or hour. Alternatively, you can define even more complex schedule intervals to deliver the desired workflow output.

To better understand how Airflow runs DAGs, we must first examine the overall process of developing and running DAGs.

Components of Apache Airflow

  1. The Airflow Scheduler is in charge of parsing DAGs, checking their schedule, monitoring their intervals, and scheduling DAG tasks for Airflow Workers to process if the schedule has passed.

  2. The Airflow Workers are in charge of picking up and carrying out tasks.

  3. The Airflow Webserver is used to visualize pipelines that are being run by the parsed DAGs. The web server also serves as the primary Airflow UI (User Interface), allowing users to track the progress of their DAGs and results.

Airflow Operators are the foundation of Airflow DAGs. They contain the logic for how data in a pipeline is processed. A DAG task is defined by instantiating an operator.
In Airflow, there are many different types of operators. Some operators, such as Python functions, run general code supplied by the user, whereas others perform very specific actions, such as data transfer from one system to another.

Some of the most commonly used Airflow operators are as follows:

  • PythonOperator: This class runs a Python function.
  • BashOperator: This class runs a bash script.
  • PythonVirtualenvOperatordecorator: to execute Python callables inside a new Python virtual environment. The virtualenv package needs to be installed in the environment that runs Airflow

XComs(short for "cross-communications") is a mechanism that allows Tasks to communicate with one another, as Tasks are normally isolated and may run on different machines.
An XCom is identified by a key (basically its name), as well as the task id and dag id from which it originated. They can have any (serializable) value, but they are only intended for small amounts of data; do not use them to pass large values around, such as dataframes.
The xcom push and xcom pull methods on Task Instances are used to explicitly "push" and "pull" XComs to and from storage. If the do xcom push argument is set to True, many operators will auto-push their results into an XCom key called return value.

"""Example DAG demonstrating the usage of XComs."""
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

dag = DAG(
    'example_xcom',
    schedule_interval="@once",
    start_date=days_ago(2),
    default_args={'owner': 'airflow'},
    tags=['example'],
)

value_1 = [1, 2, 3]
value_2 = {'a': 'b'}


def push(**kwargs):
    """Pushes an XCom without a specific target"""
    kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)


def push_by_returning(**kwargs):
    """Pushes an XCom without a specific target, just by returning it"""
    return value_2


def puller(**kwargs):
    """Pull all previously pushed XComs and check if the pushed values match the pulled values."""
    ti = kwargs['ti']

    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
    if pulled_value_1 != value_1:
        raise ValueError(f'The two values differ {pulled_value_1} and {value_1}')

    # get value_2
    pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
    if pulled_value_2 != value_2:
        raise ValueError(f'The two values differ {pulled_value_2} and {value_2}')
Enter fullscreen mode Exit fullscreen mode

your first airflow DAG using tweepy
Using airflow and tweepy to show trending hashtags every 20 minutes. First, create a new folder called tweepy_airflow.
Run this command: curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.3/docker-compose.yaml to download the docker compose file. The file contains several services definitions, including airflow-scheduler, airflow-webserver, airflow-worker, airflow-init, postgres, and redis

add data volume to the docker-compose file

    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    # volume to store data
    - ./data:/opt/airflow/data
Enter fullscreen mode Exit fullscreen mode

initialize airflow and run the dags
Ensure docker is running then run docker-compose up airflow-init . Dags are contained/put in the dags folder. After airflow is initialized start services ->run docker-compose up use docker ps to confirm that the services are running then navigate to localhost:8080

tweepy_dag.py (trending_hashtags dag)
The dag that contains functions to connect to twitter.The dag uses PythonVirtualvenvOperator to create and activate the venv that uses tweepy module. All the operations are wrapped in one callable function to avoid Xcom issues.

entire code for the dag:

#importing libraries
from datetime import timedelta
import airflow
import os 

#from app import tweepy
from airflow import DAG
from airflow.operators.python import PythonOperator # for executing python functions
from airflow.operators.python import PythonVirtualenvOperator # for working with venvs airflow

# function to get twitter API, get trending topics for Nairobi and log them
def get_api():
    # importing a package in the function ensures that it is accessible when the venv is created
    import tweepy
    # api credentials - input yours
    consumer_key = " "
    consumer_secret = " "
    access_token = " "
    access_token_secret = " "

    # authentication of consumer key and secret
    auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
    # authentication of access token and secret
    auth.set_access_token(access_token, access_token_secret)
    api = tweepy.API(auth, wait_on_rate_limit=True)
    print("successfuly activated virtual env and connected to twitter API")
    # coordinates for Nairobi city
    lat= 1.2921
    long= 36.8219
    # methods to get trends- tweepy==4.6.0
    closest_loc = api.closest_trends(lat, long)
    trends = api.get_place_trends(closest_loc[0]["woeid"])
    trends_ = trends[0]["trends"]
    hashtags = [trend["name"] for trend in trends_ if "#" in trend["name"]]
    # print hashtags
    for elem in hashtags:
        print(elem) 


def message():
    print("task complete")

default_args = {
 'owner':'airflow',
 'depends_on_past' : False,
 'start_date': airflow.utils.dates.days_ago(7),
}

trending_hashtags = DAG(
    'trending_hashtags', #name of the dag
    default_args = default_args,
    schedule_interval = timedelta(minutes=20),
    catchup = False
)

get_api_2 = PythonVirtualenvOperator(
    task_id='connecting_to_twitter_api',
    python_callable = get_api,
    requirements = ["tweepy"],
    system_site_packages=False,
    dag = trending_hashtags,
)

message_out = PythonOperator(
    task_id = 'task_complete_message',
    python_callable = message,
    dag = trending_hashtags,
)

get_api_2 >> message_out
Enter fullscreen mode Exit fullscreen mode

Airflow web UI
Using the web UI access the trending hashtags dag unpause and trigger the dag. Use any of theview options to monitor the progress of dag. check logs for output and possibe errors.

graph view of the dag

tree view of the dag

output of the DAG
use docker-compose down to stop the services

Airflow is becoming increasingly popular among software companies, financial institutions, game developers, and others. Whether you are a technical professional or not, libraries like DAGFactory make Airflow available to professionals across the enterprise, not just data engineers. Airflow DAGs are composed of tasks that execute various operations via Operators, leveraging the power of Python, Bash, HTTP, and database functionality.

You can take your data pipeline strategy to the next level with this innovative tool, orchestrating it across the most complex environments.

Top comments (0)