Stage 3: How to Create a Messaging System with Flask, Celery, and RabbitMQ
Introduction
Web applications provide various functions to thousands of users. To ensure high performance and low latency, applications run tasks that need immediate responses concurrently and use specific tools to offload other tasks to run in the background (asynchronously). These background tasks do not require immediate feedback to the user.
Asynchronous tasks make it possible to for applications to handle certain intensive workloads without blocking their main program flow. One example we will explore today is handling the email-sending task of a Flask web application asynchronously.
In this article, you'll learn how to build a robust Flask web application with email and logging functionalities. I will also show you how to run the email-sending task asynchronously using powerful tools like Celery and RabbitMQ.
Additionally, we'll use Ngrok to expose the application endpoint externally for testing performance and reliability and set up nginx as reverse proxy to route traffic to the application.
Prerequisites
To follow along with these projects, you will need:
An Ubuntu machine (WSL on Windows is okay).
Python 3 and Nginx installed.
A Gmail account.
Project structure
This project includes a root folder, subfolders, and their respective files. Your project structure should look like this:
The following commands will set up the project structure for you.
mkdir -p messaging_system/app messaging_system/nginx
cd messaging_system
python3 -m venv venv
touch app/app.py app/celery_config.py nginx/nginx.conf requirements.txt .env
Project overview
We will build the Python application using the Flask web framework. The application will handle sending emails. RabbitMQ and Celery will be used to offload this task and execute the email-sending process asynchronously.
Celery is a distributed queue system that allows you to run tasks in the background and distribute them across multiple workers. It will receive email-sending tasks from the Flask application, convert them into messages, and send them to RabbitMQ to be queued.
RabbitMQ is a message broker that enables communication between different parts of an application by sending messages between producers (like the Celery manager) and consumers (like Celery workers). It will receive tasks from Celery, queue them, and then Celery workers can pick up the messages and execute the tasks.
Installation
Let's proceed to install RabbitMQ and Ngrok. We will install Celery later during the application development.
To install RabbitMQ and enable its dashboard, run these commands:
sudo apt install rabbitmq-server -y
sudo systemctl enable rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management
sudo systemctl start rabbitmq-server
sudo systemctl status rabbitmq-server # confirm rabbitMQ is running
To install Ngrok, run these commands:
wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
unzip ngrok-stable-linux-amd64.zip
sudo mv ngrok /usr/local/bin/
sudo rm -f ngrok-stable-linux-amd64.zip
ngrok --version # confirm ngrok is installed
Building the Application
To get started, navigate to the root directory of this project, messaging_system/
.
We will list the dependencies needed for our code to work in the "requirements.txt" file.
Flask
python-dotenv
celery
Flask-Mail
Next, open the .env file and add the following configurations.
GMAIL_USER="yourmail@gmail.com"
GMAIL_PASSWORD="abcd efgh ijkl mnop"
CELERY_BROKER_URL="pyamqp://guest:guest@localhost:5672/"
CELERY_RESULT_BACKEND="rpc://guest:guest@localhost:5672/"
The following environment variables will be used to connect RabbitMQ with Celery and to authenticate the Gmail SMTP server.
Navigate into the app/
directory, open celery_config.py
file and let's develop our celery application.
# celery_config.py
from celery import Celery
from dotenv import load_dotenv
import os
# Load environment variables from .env file
load_dotenv()
def make_celery(app):
celery = Celery(
app.import_name,
broker=os.getenv('CELERY_BROKER_URL'),
backend=os.getenv('CELERY_RESULT_BACKEND')
)
celery.conf.update(app.config)
celery.autodiscover_tasks(['app']) # Ensure the app module is discovered
return celery
This code creates a function that sets up a Celery instance to work with our Flask app.
Celery uses the broker URL to connect to the message broker (RabbitMQ) and manage task queues.
The backend URL is used to store and retrieve the results of completed tasks from RabbitMQ. Both URLs are defined and retrieved from the
.env
file.
Now, let's move on to developing the main application.
- Open the
app.py
file and add the code below.
import os
import time
from flask import Flask, request, Response
from dotenv import load_dotenv
from celery import Celery
from celery_config import make_celery
from flask_mail import Mail, Message
# Load environment variables from .env file
load_dotenv()
app = Flask(__name__)
# Flask-Mail configuration for Gmail using SSL
app.config['MAIL_SERVER'] = 'smtp.gmail.com'
app.config['MAIL_PORT'] = 465 # Gmail SMTP SSL port
app.config['MAIL_USE_SSL'] = True # Enable SSL encryption
app.config['MAIL_USERNAME'] = os.getenv('GMAIL_USER')
app.config['MAIL_PASSWORD'] = os.getenv('GMAIL_PASSWORD')
# Celery configuration
app.config['CELERY_BROKER_URL'] = os.getenv('CELERY_BROKER_URL')
app.config['CELERY_RESULT_BACKEND'] = os.getenv('CELERY_RESULT_BACKEND')
# Initialize Flask-Mail and Celery
mail = Mail(app)
celery = make_celery(app)
@celery.task(name='app.send_email') # Register the task with a specific name
def send_email(receivermail):
msg = Message("Stage 3 Test Email", sender=os.getenv('GMAIL_USER'), recipients=[receivermail])
msg.body = "This is a test email from stage 3 Task."
try:
# Simulate a delay of 10 seconds before sending email
#time.sleep(10)
with app.app_context():
mail.send(msg)
print(f"Sent email to {receivermail}")
except Exception as e:
print(f"Error sending email: {e}")
return True # Optionally, return a value indicating success
@app.route("/")
def handle_request():
sendmail_param = request.args.get('sendmail')
talktome_param = request.args.get('talktome')
if sendmail_param:
send_email.delay(sendmail_param)
return f'Sending email to {sendmail_param}...'
elif talktome_param is not None:
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
log_message = f'Logged at {current_time}\n'
log_file = '/var/log/messaging_system.log'
with open(log_file, 'a') as f:
f.write(log_message)
return 'Logging message...'
else:
return 'Welcome to the Stage 3 Task Messaging System! This was built by DrInTech'
@app.route('/logs')
def get_log():
try:
with open('/var/log/messaging_system.log', 'r') as f:
log_content = f.read()
return Response(log_content, mimetype='text/plain')
except Exception as e:
return str(e), 500
if __name__ == "__main__":
app.run(debug=True)
The code above sets up a Flask web application that uses Celery to run the email-sending function asynchronously and sets up two endpoints (routes).
The email-sending function is registered as a Celery task using
@celery.task
.Emails are sent using the
flask_mail
library and the SMTP Gmail server.The
/
route handles two functionalities, accessed by the?sendmail
or?talktome
query parameters. If the/
route receives neither parameter, it returns the default web page content.The
?sendmail
parameter triggers thesend_email
function, which is executed as a Celery task to send an email to the specified address. The?talktome
parameter logs the current timestamp to the log file at/var/log/messaging_system.log
.The
/logs
route reads the content of the log file and returns it as a plain text response.
Run the following commands to create the log file for the timestamp function.
sudo touch /var/log/messaging_system.log
sudo chmod 664 /var/log/messaging_system.log
sudo chown user:group /var/log/messaging_system.log # replace user and group with your user and group.
Set up Gmail credentials
Finally, let's set the Gmail credentials in the .env file.
Create a Gmail account or use an existing one. Set the address as the value of the variable
GMAIL_USER
.Go to your account settings, enable two-factor authentication, and then create an app password.
Set the value of
GMAIL_PASSWORD
to your newly created app password.
Running the application
To run the application, activate the virtual environment (venv) to manage the dependencies. Venv isolates the dependencies for the application and prevents conflicts with other similar packages on our system.
Navigate to the root directory of this project
messaging_system/
Run the following commands to activate venv
python3 -m venv venv
source venv/bin/activate # Activate virtual environment
- Install the dependencies
pip install -r requirements.txt
- Open another terminal and run the following commands to start RabbitMQ:
sudo rabbitmqctl stop
sudo rabbitmq-server
- Open another terminal, navigate to the root project directory, activate the venv and start your celery worker.
celery -A app.celery worker --loglevel=info
The command uses the celery instance in app.py to start a celery worker.
- On another terminal with venv activated in the root project directory, navigate into the app directory, then run:
python3 app.py
You should see the application start on port 5000 in debug mode. Since we are currently developing, the debug mode provides us with logs to understand how our application is behaving and the status of its actions.
Testing the application
The next step is to test the endpoints and functionalities of the application to understand how the different components work together.
Open your browser and go to "localhost:5000". You should see the default page content.
Test the sendmail functionality by using the
?sendmail parameter
. In your browser, enterhttp://localhost:5000?sendmail=recipientmail@gmail.com
. Replace 'recipientmail' with any Gmail address you want to test this endpoint with.If your Celery terminal returns this, then confirm that the email was successfully received on the testing Gmail..
When the "/" route is queried with the
?sendmail
parameter, thesend_mail
function is passed as a task to Celery using thedelay()
method. Celery queues the task as a message on RabbitMQ. The Celery worker then picks up the message and executes the task.
Test the talktome functionality by querying with the
?talktome parameter
. On your browser runhttp://localhost:5000?talktome
. This logs the current time to the log file and returns a message that tells us it just did that.Lastly, test the
/logs
route. This should return the contents of the log file in plain text. To do this, go tohttp://localhost:5000/logs
in your browser.
Set up Nginx as a Reverse Proxy
Nginx will be used as a reverse proxy to receive user traffic and route it to the application. This layer of interaction improves security by preventing users from directly accessing our application.
- Open nginx.conf and paste the configuration below.
user www-data;
worker_processes auto;
pid /run/nginx.pid;
events {
worker_connections 768;
# multi_accept on;
}
http {
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
types_hash_max_size 2048;
include /etc/nginx/mime.types;
default_type application/octet-stream;
# Logging
access_log /var/log/nginx/access.log;
error_log /var/log/nginx/error.log;
gzip on;
gzip_disable "msie6";
server {
listen 80;
server_name localhost;
location / {
proxy_pass http://127.0.0.1:5000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /logs {
proxy_pass http://127.0.0.1:5000/logs;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
}
- "localhost" will serve as the local domain. Nginx will listen to traffic on this domain and internally proxy the traffic to the application.
- We will back up the default nginx.conf file and then replace it with our own. This way, you can restore the default nginx.conf after the project.
sudo mv /etc/nginx/nginx.conf /etc/nginx/nginx.conf.bkp
sudo cp messaging_system/nginx/nginx.conf /etc/nginx/nginx.conf
sudo nginx -t #test configuration for syntax errors
sudo systemctl reload nginx
Test all the routes and functionalities to ensure they are working, and that traffic is correctly routed to them.
localhost
localhost?sendmail=recipientmail@gmail.com
localhost?talktome
localhost/logs
Exposing the application to the internet
The last step is to expose the local application endpoint to the internet using Ngrok. This allows us to test the performance, accessibility, and reliability externally while the application runs in our local environment.
Ngrok provides an external endpoint URL. Requests to this URL are securely tunneled to the port where our application is running. We have Ngrok running, but to use it for an extended period, you need an authentication token.
Sign up on ngrok.com to get the auth token. To expose our application externally, run:
ngrok config add-authtoken <auth-token>
ngrok http 5000
Copy the external endpoint provided by Ngrok and test it in your browser.
Run applications in the background
While developing the application, we have opened several terminals to monitor each application's activity and gain insights from the logs.
Once we are confident that our application is working well, we can take away the stress of managing multiple terminals by running the applications in the background.
Stop the applications and run them in the background with the following command:
sudo rabbitmq-server -detached
celery -A app.celery worker --loglevel=info --detach
nohup python3 app.py
To run Ngrok in the background and ensure the process continues even if we close the terminal, use the following commands:
screen -S ngrok # Start a screen session for ngrok
ngrok http 80 # Run Ngrok in the Screen Session
Press "Ctrl + A then D" to detach from the screen session. This leaves Ngrok running in the background. To reattach screen run screen -r ngrok
.
Conclusion
You have learned how to build a robust messaging system using the Python web framework (Flask). This system uses powerful tools like RabbitMQ and Celery to efficiently handle email-sending tasks in the background, ensuring high performance of our application.
You can check the YouTube walkthrough here.
This is the Stage 3 project of my HNG internship, and I hope you've learned a few things and found the project helpful.
Till I write again. Adios.
Subscribe to my newsletter
Read articles from Okesanya Samuel directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Okesanya Samuel
Okesanya Samuel
Welcome to my blog! My name is Okesanya Odunayo, and I'm a passionate cloud practitioner, DevOps enthusiast, and health advocate. I believe that sharing knowledge and insights is essential for driving innovation and advancing the industry as a whole. So if you're looking to learn more about the exciting world of cloud computing and DevOps, you've come to the right place.