Introduction
My only encounter with a message broker service until this point was when I read about Amazon MQ in preparation for my AWS Solutions Architect Associate examination. Imagine my utter confusion when I read the project for stage 3.
In this stage of my HNG journey, I was required to build a project that combines the power of FastAPI and Celery to create a robust messaging system. This system not only handles user requests efficiently but also leverages asynchronous task processing for seamless email sending. Throughout this article, I'll walk you through the key components and steps involved in developing this application, highlighting the integration of RabbitMQ as the message broker and utilizing SMTP for email delivery. By the end, you'll have a comprehensive understanding of how to build and deploy a similar system.
Here are the full requirements for this task:
- Install RabbitMQ and Celery on your local machine.
- Set up a Python application with the following functionalities:
An endpoint that can accept two parameters:
?sendmail
and?talktome
. -
Endpoint Functionalities:
?sendmail
: When this parameter is passed, the system should send an email using SMTP to the specified email address. (e.g.,?sendmail=mail@example.com
). Use RabbitMQ/Celery to queue the email sending task. Ensure the email-sending script retrieves and executes tasks from the queue.?talktome
: When this parameter is passed, the system should log the current time to /var/log/messaging_system.log. - Nginx Configuration: Configure Nginx to serve your Python application. Ensure proper routing of requests to the application.
- Endpoint Access: Use ngrok or a similar tool to expose your local application endpoint for external access.
Prerequisites
- Python 3.8+
- SMTP Email Account
Let's Get Started
Step 1
RabbitMQ and Celery Local Installation
For this project, I'll be using a Vagrant box on my laptop running Ubuntu 22.04 OS, but you can use any linux flavor you want.
We were instructed not to run RabbitMQ as a Docker container, so I ran the debian installation script from the official docs
#!/bin/sh
sudo apt-get install curl gnupg apt-transport-https -y
## Team RabbitMQs main signing key
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
## Community mirror of Cloudsmith: modern Erlang repository
curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null
## Community mirror of Cloudsmith: RabbitMQ repository
curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.9F4587F226208342.gpg > /dev/null
## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
##
deb [arch=amd64 signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
# another mirror for redundancy
deb [arch=amd64 signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
## Provides RabbitMQ
##
deb [arch=amd64 signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
# another mirror for redundancy
deb [arch=amd64 signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
EOF
## Update package indices
sudo apt-get update -y
## Install Erlang packages
sudo apt-get install -y erlang-base \
erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
erlang-runtime-tools erlang-snmp erlang-ssl \
erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl
## Install rabbitmq-server and its dependencies
sudo apt-get install rabbitmq-server -y --fix-missing
To confirm if RabbitMQ is installed correctly, run this command
sudo rabbitmqctl status
Next, we need to install Celery. Before that, let's create a Python virtual environment (venv) to better handle pip dependencies across our local machine.
Ensure the venv package is installed then create a venv
sudo apt install python3.10-venv
python3 -m venv env
# start up the virtual environment
source env/bin/activate
We can now observe an (env)
block before our command prompt
To install celery, run
pip install celery
# confirm celery installation
celery --version
Step 2
Python Application Setup
Before we start writing our python application, let's understand celery and RabbitMQ and the role they will be playing in our application.
RabbitMQ is a message broker software that facilitates communication between different applications or components within a system. It acts as an intermediary that can receive messages from producers (senders) and deliver them to consumers (receivers). In the context of this application, Celery acts as both the producer and the consumer of the messages sent to the message broker.
Celery is an open-source distributed task queue system for Python, designed to handle asynchronous and scheduled tasks in a reliable and scalable manner. It enables you to run tasks concurrently. We send tasks to the queue as a producer using the .delay()
method and then a celery worker consumes and executes the task in a celery worker process.
Let's use a short demo to see how Celery and RabbitMQ work together.
Create two files, main.py
and tasks.py
tasks.py
from celery import Celery
from time import sleep
celery = Celery(
"tasks",
broker="pyamqp://guest:guest@localhost:5672//"
)
@celery.task
def reverse(text):
sleep(5)
print(text[::-1])
def echo(text):
print(text)
reverse.delay('first function')
echo('second function')
This script initializes a celery object that uses RabbitMQ as the broker
, the string "pyamqp://guest:guest@localhost:5672//"
connects celery to RabbitMQ running locally on port
5672
using the default login username and password called guest
.The script then calls two functions, reverse() and echo()
but sent the reverse
function to celery using the .delay()
method . The first function takes in a string, sleeps for 5 seconds and then return the reversed string. The second function just repeats the string passed into it.
Run this script using
python3 tasks.py
Immediately the string passed into the second function is printed to the terminal and the first function doesn't return anything to the terminal
To see the outcome of the first function, open a new terminal, P.S. Ensure you enter the same virtual environment as the previous terminal.
celery -A tasks worker --loglevel=info
A celery worker starts up, connects to RabbitMQ, waits for 5 seconds then prints out the reversed string passed into the first function
Now we've observed how celery enables asynchronous programming by utilizing task queues, let's proceed with the python application. We would later see how to make use of a backend with celery so as to get the output of the consumed task in the celery worker process and use it in our application.
We need to first install the necessary dependencies for this application like FastAPI for building APIs and uvicorn for running our backend server.
pip install fastapi uvicorn
Next we need to make sure our log file exists and the correct permissions are set in order not to encounter any permission errors. My user is vagrant, so I'll give vagrant ownership of the file.
sudo touch /var/log/messaging_system.log
sudo chown vagrant:vagrant /var/log/messaging_system.log
create a .env
file that contains your email address and smtp_password. If you are using gmail, you can get your password using app_passwords
.env
email_sender=your_email
email_password=your_password
Let's write our email sending application.
tasks.py
from celery import Celery
from dotenv import load_dotenv
import os
from email.message import EmailMessage
import smtplib
# Load environment variables from a .env file
load_dotenv()
# Initialize a Celery instance with a broker and backend
celery = Celery(
"tasks",
broker="pyamqp://guest:guest@localhost:5672//",
backend="rpc://"
)
# Define a Celery task to send an email
@celery.task
def send_mail(email):
# Retrieve email sender and password from environment variables
email_sender = os.getenv("email_sender")
email_password = os.getenv("email_password")
# Set the recipient email address
email_receiver = email
# Define the email subject and body
subject = "Hello From HNG"
body = "This is a test email"
# Create an EmailMessage object and set its content
em = EmailMessage()
em["From"] = email_sender
em["To"] = email_receiver
em["Subject"] = subject
em.set_content(body)
try:
# Establish a secure connection to the SMTP server
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
# Log in to the SMTP server using the senders credentials
server.login(email_sender, email_password)
# Send the email
server.sendmail(email_sender, email_receiver, em.as_string())
# Return a success message if the email is sent
return f"Email sent to {email_receiver}"
except Exception as e:
# Return an error message if the email fails to send
return f"Failed to send email to {email_receiver}: {e}"
This code:
- Uses the
backend="rpc://"
in Celery configuration which refers to the result backend. This setting determines where Celery stores the results of tasks after they are executed. Therpc://
backend in Celery uses RabbitMQ to store task results. This is implemented using RabbitMQ's Remote Procedure Call (RPC) mechanism. - Uses celery tasks to create a
send_mail
asynchronous function that composes a test email which utilizes pythons' smtplib library to send secure emails to the recipient. - The function takes in a recipient email as a parameter which would passed into it from
main.py
main.py
from fastapi import FastAPI
from typing import Optional
from datetime import datetime
from tasks import send_mail
from fastapi.responses import PlainTextResponse
# Initialize the FastAPI application
app = FastAPI()
# Function to log events to a file
def logger(event):
# Open the log file in append mode
with open("/var/log/messaging_system.log", "a") as log_file:
# Write the event with a timestamp to the log file
log_file.write(f"{datetime.now()}: {event}\n")
# Define a GET endpoint at the root URL
@app.get("/")
def test(sendmail: Optional[str] = None, talktome: Optional[str] = None):
# Initialize the response dictionary
response = {}
# If 'sendmail' query parameter is provided
if sendmail is not None:
# If 'sendmail' is an empty string
if sendmail == "":
response["sendmail"] = "no mail provided"
else:
# Log the sending mail action
logger(f"Sending mail to {sendmail} .....")
# Call the asynchronous send_mail task
result = send_mail.delay(sendmail)
# Log the result of the send_mail task
logger(f"{result.get()}")
# Indicate that the action has been logged
response["sendmail"] = "logged action to /logs"
# If 'talktome' query parameter is provided
if talktome is not None:
# Log the talktome message
logger(talktome)
# Indicate that the action has been logged
response["talktome"] = "logged action to /logs"
# If neither 'sendmail' nor 'talktome' query parameters are provided
if sendmail is None and talktome is None:
response["Default"] = "no parameters provided"
# Return the response dictionary
return response
# Define a GET endpoint to retrieve logs
@app.get("/logs")
def logs():
# Open the log file in read mode
with open("/var/log/messaging_system.log", "r") as log_file:
# Read the entire content of the log file
log_content = log_file.read()
# Return the log content as plain text response
return PlainTextResponse(content=log_content)
This code:
- Imports the necessary dependencies needed for the script
- Creates a logging function that prints the current time and event to the log file we created previously
- Initializes FastAPI
- Creates two endpoints, one for the root
/
and one for the logs/logs
. The/
endpoint uses the send_mail function imported fromtasks.py
and calls it asynchronously using.delay()
, but because we are using a backend intasks.py
we can retrieve the results of the asynchronous function using the.get()
method. The?sendmail
query parameter sends a mail to the provided email address. If?talktome
query parameter is passed, It prints the current time to the log file.
Step 3
Creating systemd services for celery and uvicron
Our application is ready and we can start celery using
celery -A tasks worker --loglevel=info
Start uvicorn for our backend application server using
uvicorn main:app --reload
P.S if you wish to access the application from the browser add the --host 0.0.0.0
to the uvicorn command
But this method will require us to open and maintain multiple terminal sessions. Instead let's run the applications as systemd services in the background.
Open /etc/systemd/system/uvicorn.service
and /etc/systemd/system/celery.service
with sudo
permissions.
/etc/systemd/system/uvicorn.service
[Unit]
Description=Uvicorn instance to serve FastAPI application
After=network.target
[Service]
User=vagrant
Group=www-data
WorkingDirectory=/home/vagrant/message-broker-stage-3/app
ExecStart=/home/vagrant/.local/bin/uvicorn main:app --reload --host 0.0.0.0 --port 8000
[Install]
WantedBy=multi-user.target
/etc/systemd/system/celery.service
[Unit]
Description=Celery Service
After=network.target
[Service]
User=vagrant
Group=www-data
WorkingDirectory=/home/vagrant/messaging-stage-3/app
ExecStart=/home/vagrant/.local/bin/celery -A tasks worker --loglevel=info
[Install]
WantedBy=multi-user.target
Ensure you input the correct user, file path and location for the bin directory of your uvicorn and celery binaries
you can then start and enable the two services in the background using
sudo systemctl start uvicorn.service
sudo systemctl enable uvicorn.service
sudo systemctl start celery.service
sudo systemctl enable celery.service
Ensure they are running properly by using
sudo systemctl status uvicorn.service
sudo systemctl status celery.service
Step 4
Configure Nginx reverse proxy
In this step, we will use Nginx to route localhost to our application running on port 8000.
If you don't have nginx installed. You can install using
sudo apt update
sudo apt install nginx nginx-core
Open the nginx.conf
and replace the default http{}
block
nginx.conf
http {
# Basic HTTP settings
server_tokens off;
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
types_hash_max_size 2048;
server {
listen 80;
server_name localhost;
location / {
proxy_pass http://localhost:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
}
Check if the configuration is correct and restart Nginx using
sudo nginx -t
sudo systemctl restart nginx.service
Running curl localhost?talktome
should respond with a JSON message informing you that the action has been logged to /logs.
Step 5
Use Ngrok to expose api
The final step requires us to use ngrok to expose our API securely to the public.
Set up an ngrok free account and obtain a auth token by following the official ngrok docs
Finally we can expose our API running on localhost:80 using
ngrok http 80
Test the email functionality by adding ?sendmail=your_email
You can also test the ?talktome
query parameter access the logs using /logs
path.
Conclusion
We have successfully developed a Python application that utilizes RabbitMQ and Celery as a message broker and task queue executor. This application deploys a public API endpoint using ngrok, providing functionality for sending emails and printing logs.
By integrating these technologies, we have created a robust and scalable messaging system. This project not only demonstrates the power of asynchronous task processing with Celery and RabbitMQ but also highlights the simplicity and efficiency of using FastAPI for building web applications. Deploying the API with ngrok allows for quick public exposure and testing, making it an excellent tool for development and debugging.
In conclusion, this project serves as a solid foundation for building more complex systems that require reliable message handling and task execution. Whether you are looking to expand its capabilities or use it as a learning experience, this messaging system showcases the potential of modern Python web development.
The tasks are getting longer and harder as the stages go by. I can't imagine what is in store in stage 4, But one thing is certain, I'll give it the best I've got.
If you made it this far, Thank you, Thank you, Thank you. ♥
Until next time.
Happy Programming 🚀
Top comments (2)
You did a great job!
Thank You ❤