Push VS Polling: Building a Lightweight, Real-Time Notification System with FastAPI and SSE

Building a notification system is more than just logging events and sending them to a client. A truly effective system involves architectural planning, real-time event handling, and scale considerations. But what if you're just starting out? Do you really need complex tools like message brokers and queues?
I recently took a deep dive into this problem and discovered that for many applications, the answer is no. You can build a powerful, real-time notification service without the overhead of external dependencies. This article will show you how.
The Goal: Efficiency Without Over-Engineering
I wanted to create a notification service that was efficient for a new application with minimal users and traffic. Using heavy-duty message brokers like RabbitMQ or Kafka felt like a classic case of premature optimization.
The solution? I designed a lightweight, in-memory pub/sub system using only Python's asyncio and a custom ConnectionManager class. Let's dive into how it works.
Requirements
This tutorial builds upon a standard FastAPI setup. If you're starting from scratch, you can follow my complete setup guide here: How To Set Up Basic FastAPI Project.
To follow along, you'll need:
A working FastAPI project (as described in the link above).
The sse-starlette package for handling Server-Sent Events.
You can install it with pip:
pip install sse-starlette
Step 1: Designing The Database Tables
First, we need to define how notifications will be stored. In your app/prisma/schema.prisma
file, add the following models:
# ----- Previous prisma settings --------
enum NotificationType {
ALERT
REMINDER
UPDATE
SYSTEM_NOTICE
PROMOTION
}
# --- Add these relations to your existing User model ----
model User {
# ----- Previous user fields -------
notifications Notifications[] # One to Many relationship with Notification table
notification_preferences NotificationPreferences?
model Notifications {
id Int @id @default(autoincrement())
user_id String @db.Uuid
title String?
content String?
type NotificationType @default(ALERT)
is_read Boolean? @default(false)
read_at DateTime?
sent_at DateTime?
users User @relation(fields: [user_id], references: [id], onDelete: Cascade)
created_at DateTime @default(now())
updated_at DateTime? @updatedAt
@@map("notifications")
}
model NotificationPreferences {
id Int @id @default(autoincrement())
user_id String @db.Uuid @unique
is_enabled Boolean? @default(true)
is_email_enabled Boolean? @default(true)
is_sms_enabled Boolean? @default(true)
is_push_enabled Boolean? @default(true)
user User @relation(fields: [user_id], references: [id], onDelete: Cascade)
created_at DateTime @default(now())
updated_at DateTime? @updatedAt
@@map("notification_preferences")
@@index([user_id])
}
After adding the models, run your database migration:
make migrate
Notifications: The main table that stores the content of each notification and links it to a user.
NotificationPreferences: A separate table to store user-specific settings, allowing them to control which notifications they receive.
Step 2: Creating the In-Memory Queue (The ConnectionManager)
This is the heart of our lightweight system. Instead of an external message broker, we'll use a Python class to manage active connections in memory. But before we go on with the code, let us discuss a crucial architectural choice that was made at this stage: Push vs Pull.
The Inefficient Way: Polling (Pull)
A simple approach would be to create an SSE endpoint that queries the database for new notifications every few seconds. This is called polling. The client repeatedly "pulls" data from the server. While easy to implement, it's very inefficient. It puts a constant, unnecessary load on your database and isn't truly real-time—there's always a delay.
The Efficient Way: Real-Time Push
A much better approach is a "push" system. The client opens a single, persistent connection and just waits. When a new notification is created anywhere in our application, our server pushes that single new event down the connection to the client. This is instantaneous, highly efficient, and puts zero polling load on our database.
The ConnectionManager
class is the tool that enables this push-based architecture without needing a heavy message broker. Let's build it.
In your app directory, create a notifications module, and inside it, a file named manager.py
.
# app/notifications/manager.py
import asyncio
from typing import Dict, List
import logging
logger = logging.getLogger(__name__)
class ConnectionManager:
"""Manages active user connections for real-time notifications."""
def __init__(self):
# A dictionary where keys are user_ids and values are a list of asyncio Queues.
# A list is used because a single user can have multiple browser tabs open.
self.active_connections: Dict[str, List[asyncio.Queue]] = {}
async def connect(self, user_id: str) -> asyncio.Queue:
"""A client connects. A new queue is created and returned for this connection."""
queue = asyncio.Queue()
if user_id not in self.active_connections:
self.active_connections[user_id] = []
self.active_connections[user_id].append(queue)
logger.info(f"User {user_id} connected. Total connections for user: {len(self.active_connections[user_id])}")
return queue
def disconnect(self, user_id: str, queue: asyncio.Queue):
"""A client disconnects. The specific queue is removed from the user's list."""
if user_id in self.active_connections:
self.active_connections[user_id].remove(queue)
if not self.active_connections[user_id]:
del self.active_connections[user_id]
logger.info(f"User {user_id} disconnected from notifications.")
async def push_notification(self, message: str, user_id: str):
"""Pushes a message to all active connections for a specific user."""
if user_id in self.active_connections:
for queue in self.active_connections[user_id]:
await queue.put(message)
async def push_broadcast(self, message: str):
"""Pushes a message to every single active connection."""
for user_queues in self.active_connections.values():
for queue in user_queues:
await queue.put(message)
How it works:
connect
: When a user opens a new tab, they connect to our stream. We create a uniqueasyncio.Queue
for that specific connection and store it.disconnect
: When the user closes the tab, we clean up by removing their queue.push_notification
: This is how we send a targeted notification. It finds all active connections for a single user and puts the message in their queues.
Step 3: Defining the API Endpoints
Now, let's create the API routes. In your notifications module, create a file named routes.py
.
# app/notifications/routes.py
import asyncio
import json
import logging
from datetime import datetime, timezone
from fastapi import APIRouter, Request
from sse_starlette.sse import EventSourceResponse
from app.db_config.prismaClient import prisma
from app.notifications.core.model import NotificationSchema, NotificationType
from app.notifications.core.manager import ConnectionManager
logger = logging.getLogger(__name__)
# Core Notification Event Handler and Emitter
router = APIRouter(prefix="/notifications", tags=["notifications"])
manager = ConnectionManager() # Create a single, shared instance
# SSE Endpoint for Real-time Push Notifications
@router.get("/stream/{user_id}")
async def stream_notifications(request: Request, user_id: str):
""""
Generator function to yield notification events for a specific user.
This function simulates real-time notification delivery using Server-Sent Events (SSE).
The user is connected to a queue managed by the ConnectionManager
"""
queue = await manager.connect(user_id)
async def event_generator():
try:
while True:
# The 'await queue.get()' line will pause here, waiting
# patiently until a message is put into the queue.
notification_data = await queue.get()
if await request.is_disconnected():
break
yield {
"event": "new_notification",
"data": notification_data,
}
except asyncio.CancelledError:
logger.info(f"Client {user_id} disconnected, stopping event generator.")
except Exception as e:
logger.error(f"Error in event generator: {e}")
yield {
"event": "error",
"data": json.dumps({"error": str(e)}),
}
finally:
manager.disconnect(user_id, queue)
logger.info(f"Cleaned up connection for user {user_id}")
return EventSourceResponse(event_generator())
# Getting all notifications for a user
@router.get("/all/{user_id}")
async def get_all_notifications(user_id: str):
try:
notifications = await prisma.notifications.find_many(
where={"user_id": user_id},
order={"sent_at": "desc"},
)
return get_response(
status_code=StatusCode.OK,
message="Notifications fetched successfully",
data=[json.loads(notif.model_dump_json()) for notif in notifications],
)
except Exception as e:
logger.error(f"Error fetching notifications for user {user_id}: {e}")
# Update notification as read
@router.patch("/read/{notification_id}")
async def read_notification(notification_id: int):
try:
notification = await prisma.notifications.update(
where={"id": notification_id},
data={"is_read": True, "read_at": datetime.now(timezone.utc)},
)
logger.info(f"Notification {notification_id} marked as read.")
return get_response(
status_code=StatusCode.OK,
message="Notification marked as read",
data=json.loads(notification.model_dump_json()),
)
except Exception as e:
logger.error(f"Error marking notification {notification_id} as read: {e}")
return get_response(
status_code=StatusCode.INTERNAL_SERVER_ERROR,
message=f"Failed to mark notification as read: {str(e)}",
)
The Three Key Endpoints:
/stream/{user_id}
: The real-time SSE endpoint. A client connects here and just waits. It does not touch the database./all/{user_id}
: A standard REST endpoint that the frontend calls once on page load to get the user's notification history./read/{notification_id}
: A standard REST endpoint to update the is_read status when a user clicks a notification.
Step 4: Emitting Notifications from Anywhere
The beauty of this system is its decoupling. We can now create and send a notification from anywhere in our application.
Let's create a helper function. In your notifications module, create emitter.py
:
# app/notifications/emitter.py
import json
import logging
from datetime import datetime, timezone
from app.settings.prisma_client import prisma
from app.notifications.manager import manager # Import the shared manager instance
from pydantic import BaseModel, EmailStr
from enum import Enum
logger = logging.getLogger(__name__)
class NotificationType(str, Enum):
ALERT = "ALERT"
REMINDER = "REMINDER"
# ---- Add other types as needed -----
class NotificationSchema(BaseModel):
user_id: Optional[str]
title: Optional[str]
content: Optional[str]
type: Optional[NotificationType] = NotificationType.ALERT
async def emit_notification_event(data: NotificationSchema):
"""
Creates a notification in the DB and pushes it to the live user connection.
"""
try:
# 1. Create the record in the database
data = data.model_dump()
notification = await prisma.notifications.create(
data={
"user_id": data.get("user_id"),
"title": data.get("title"),
"content": data.get("content"),
"type": data.get("type").value if data.get("type") else NotificationType.ALERT.value,
"is_read": False,
"sent_at": datetime.now(timezone.utc),
}
)
logger.info(f"Notification created with ID: {notification.id}")
# 2. Convert the new notification to a JSON string
notification_dict = notification.model_dump()
notification_json = json.dumps(notification_dict, default=str)
# 3. Push the JSON to the user's live connection que
if data.get("user_id"):
await manager.push_notification(notification_json, data.get("user_id"))
else:
logger.warning("No user_id provided; skipping push to user queue."
return True
except Exception as e:
logger.error(f"Error creating notification: {e}")
return False
Now, you can import and call this emit_notification function from any part of your application. For example, in your user login logic:
Now that we are done, we just include the logic where it should be.
from app.notifications.core.emit import emit_notification_event
from app.notifications.core.models import NotificationSchema, NotificationType
async def login_user(...):
# --- successful login logic here ---
# After successful login, emit a notification
await emit_notification_event(
NotificationSchema(
user_id=<user_id>,
title="Login Successful",
content="You have successfully logged in.",
type=NotificationType.ALERT,
)
)
return JSONResponse(...)
Video Playout
%[Watch the live demo of the notification system](https://youtu.be/WCMw7Cd7I08)
Conclusion
We've successfully built a powerful, real-time, and non-blocking notification system without adding any external dependencies like Redis or RabbitMQ. By combining FastAPI's async capabilities with a simple ConnectionManager, we created a lightweight pub/sub system that is perfect for projects in their early stages.
The "Fetch-then-Listen" pattern, where the client fetches historical data via a standard REST API and then opens an SSE connection for live updates, is a robust and scalable approach for building modern real-time UIs.
Thank you for reading! If you have any questions or additions, please leave your comments below.
References
Subscribe to my newsletter
Read articles from Onuh Chidera Theola directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Onuh Chidera Theola
Onuh Chidera Theola
Hello 👋🏾 I’m a highly motivated software developer from Nigeria, who enjoys coming up with solutions and ideas for new projects. I also enjoy reading a lot, like books, comics/manga,manhwa. Feel free to connect with me anytime☺️