We have reached an advanced technological stage where small blocks of code can be assembled into simple bots, providing functionality that aids in building full workflows without the need for writing bulky monolithic applications or microservices.
What is WorkFlow?
Workflows offer significant advantages over traditional coding methods. With workflows, we can create blocks of code using different languages or libraries, string them together, and orchestrate their execution to fulfil business requirements.
There are several open-source solutions available for workflow execution.
- Apache NiFi (Java)
- Apache AirFlow (Python)
Today, we'll delve into Apache Airflow and explore its real-time workflow implementation to kickstart our learning journey.
pre-requisites
docker, docker-compose
python, pip
vs code.
Usercase: Every morning at 9 am, the latest price list of selected stock prices will be sent via email or SMS.
To achieve this, two functions need to be created:
Fetch Stock Price of Listed Stocks: This function will retrieve the latest stock prices of the selected stocks.
Send Email with the Stock Price Response: This function will compose an email containing the fetched stock prices and send it to the designated recipients.
These functions will automate the process of fetching stock prices and delivering them to users' inboxes or mobile phones, ensuring they stay updated with the latest market information.
import yfinance as yf
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
# Function to fetch stock prices
def get_stock_prices(symbols):
stock_data = yf.download(symbols, period="1d")["Close"]
return stock_data
# Function to send email
def send_email(subject, body, recipients):
sender_email = "your_email@gmail.com"
sender_password = "your_email_password"
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = ", ".join(recipients)
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with smtplib.SMTP("smtp.gmail.com", 587) as server:
server.starttls()
server.login(sender_email, sender_password)
server.sendmail(sender_email, recipients, msg.as_string())
# Main function
def main():
# Define stock symbols
symbols = ["AAPL", "MSFT", "GOOGL", "AMZN"]
# Fetch stock prices
stock_prices = get_stock_prices(symbols)
# Format email message
subject = "Daily Stock Prices - {}".format(datetime.now().strftime("%Y-%m-%d"))
body = "Today's Stock Prices:\n\n{}".format(stock_prices)
# Define email recipients
recipients = ["recipient1@example.com", "recipient2@example.com"]
# Send email
send_email(subject, body, recipients)
# Execute main function
if __name__ == "__main__":
main()
Now we need make these function are part of the AirFlow workflow. Before that we will explore the structure and features of the Apache AirFlow
Introduction:
Apache Airflow has revolutionized the way organizations manage, schedule, and monitor their data workflows and monitoring workflows as Directed Acyclic Graphs (DAGs). With Airflow, users can define workflows as code, making it easy to manage, version control, and collaborate on data pipelines.
Key Features of Apache Airflow:
Dynamic Workflow Definition: Airflow allows users to define workflows as code using Python. This enables dynamic and flexible workflow definitions, making it easy to create, modify, and extend pipelines.
Dependency Management: Airflow handles dependencies between tasks within a workflow, ensuring that tasks are executed in the correct order based on their dependencies.
Scheduling: Airflow provides powerful scheduling capabilities, allowing users to define complex scheduling patterns using cron-like expressions. This enables users to schedule workflows to run at specific times or intervals.
Monitoring and Alerting: Airflow comes with a built-in web interface for monitoring workflow execution, tracking task status, and viewing logs. It also supports integration with external monitoring and alerting tools.
Extensibility: Airflow is highly extensible, with a rich ecosystem of plugins and integrations. Users can easily extend Airflow's functionality by developing custom operators, sensors, and hooks.
Integrating the Sample with AirFlow
Real-time Stock Price Checking and Email Notification Workflow with Apache Airflow
In this use case, we'll create an Apache Airflow DAG
(Directed Acyclic Graph) to check real-time stock prices every morning at 9 AM and send an email notification with the latest stock prices to predefined recipients. We'll use the yfinance library for fetching stock prices and the smtplib library for sending emails.
Workflow Steps:
Fetch Stock Prices: At 9 AM every morning, the DAG will trigger a task to fetch real-time stock prices for predefined stocks using the yfinance
library.
Format Email: After fetching the stock prices, the DAG will trigger a task to format the data into an email message.
Send Email: The DAG will trigger a task to send an email containing the latest stock prices to predefined recipients using the smtplib
library.
Sample Code (Apache Airflow DAG):
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import yfinance as yf
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 3, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1
}
dag = DAG(
'stock_price_notification',
default_args=default_args,
description='Check real-time stock prices and send email notification',
schedule_interval='0 9 * * *' # Run every day at 9 AM
)
def get_stock_prices():
symbols = ["AAPL", "MSFT", "GOOGL", "AMZN"]
stock_data = yf.download(symbols, period="1d")["Close"]
return stock_data
def send_email(subject, body, recipients):
sender_email = "your_email@gmail.com"
sender_password = "your_email_password"
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = ", ".join(recipients)
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with smtplib.SMTP("smtp.gmail.com", 587) as server:
server.starttls()
server.login(sender_email, sender_password)
server.sendmail(sender_email, recipients, msg.as_string())
def process_stock_prices():
stock_prices = get_stock_prices()
subject = "Daily Stock Prices - {}".format(datetime.now().strftime("%Y-%m-%d"))
body = "Today's Stock Prices:\n\n{}".format(stock_prices)
recipients = ["recipient1@example.com", "recipient2@example.com"]
send_email(subject, body, recipients)
fetch_stock_prices_task = PythonOperator(
task_id='fetch_stock_prices',
python_callable=get_stock_prices,
dag=dag
)
send_email_task = PythonOperator(
task_id='send_email',
python_callable=process_stock_prices,
dag=dag
)
fetch_stock_prices_task >> send_email_task
Build and Run Instructions
- Clone this repository:
> git clone https://github.com/uttesh/airflow.git
- Run the following command to build and start the Docker containers:
> docker-compose up -d --build
Access the Apache Airflow UI at http://localhost:8080 in your browser. The default account has the login airflow and the password airflow.
In the Airflow UI, enable the stock_price_notification DAG and trigger a manual run.
- Home Page
Advance Realtime workflow samples.
Event Processing: A DAG that listens to a message queue (e.g., Apache Kafka) for incoming events, processes each event, and takes appropriate actions based on event content.
Monitoring and Alerting: A DAG that continuously monitors system metrics (e.g., CPU usage, memory usage) using monitoring tools (e.g., Prometheus, Grafana), and sends alerts via email or messaging service (e.g., Slack) when thresholds are exceeded.
Data Streaming and ETL: A DAG that consumes data from a streaming source (e.g., Apache Kafka, AWS Kinesis), applies real-time transformations using Apache Spark or Apache Flink, and loads the transformed data into a data store (e.g., Apache Hadoop, Apache Cassandra).
Real-time Model Inference: A DAG that listens to incoming data streams, applies pre-trained machine learning models using libraries like TensorFlow Serving or PyTorch Serve, and returns real-time predictions or classifications.
Web Scraping and Data Extraction: A DAG that periodically fetches data from web APIs, extracts relevant information using web scraping tools (e.g., BeautifulSoup, Scrapy), and stores the extracted data in a database or data warehouse for further analysis.
These are just a few examples of how Apache Airflow can be used for real-time workflows. Depending on your use case and requirements, you can customize and extend these sample workflows to fit your specific needs. Remember to consider scalability, fault tolerance, and resource management when designing real-time workflows in Apache Airflow.
Conclusion:
Apache Airflow is a game-changer in the world of data engineering, providing a flexible, scalable, and robust platform for orchestrating data workflows. Whether you're a data engineer, data scientist, or business analyst, Apache Airflow is a awsome tool in your data toolkit.
*Source Code: * https://github.com/uttesh/airflow
Top comments (0)