Push VS Polling: Building a Lightweight, Real-Time Notification System with FastAPI and SSE

Building a notification system is more than just logging events and sending them to a client. A truly effective system involves architectural planning, real-time event handling, and scale considerations. But what if you're just starting out? Do you really need complex tools like message brokers and queues?

I recently took a deep dive into this problem and discovered that for many applications, the answer is no. You can build a powerful, real-time notification service without the overhead of external dependencies. This article will show you how.

The Goal: Efficiency Without Over-Engineering

I wanted to create a notification service that was efficient for a new application with minimal users and traffic. Using heavy-duty message brokers like RabbitMQ or Kafka felt like a classic case of premature optimization.

The solution? I designed a lightweight, in-memory pub/sub system using only Python's asyncio and a custom ConnectionManager class. Let's dive into how it works.

Requirements

This tutorial builds upon a standard FastAPI setup. If you're starting from scratch, you can follow my complete setup guide here: How To Set Up Basic FastAPI Project.

To follow along, you'll need:

  • A working FastAPI project (as described in the link above).

  • The sse-starlette package for handling Server-Sent Events.

You can install it with pip:

pip install sse-starlette

Step 1: Designing The Database Tables

First, we need to define how notifications will be stored. In your app/prisma/schema.prisma file, add the following models:

# ----- Previous prisma settings --------

enum NotificationType {
  ALERT
  REMINDER
  UPDATE
  SYSTEM_NOTICE
  PROMOTION
}

# --- Add these relations to your existing User model ----
model User {
  # ----- Previous user fields -------
  notifications        Notifications[]  # One to Many relationship with Notification table
  notification_preferences NotificationPreferences?


model Notifications {
  id        Int    @id   @default(autoincrement())
  user_id   String   @db.Uuid
  title     String?
  content   String?
  type      NotificationType @default(ALERT)
  is_read   Boolean? @default(false)
  read_at   DateTime? 
  sent_at    DateTime?

  users     User   @relation(fields: [user_id], references: [id], onDelete: Cascade)

  created_at DateTime  @default(now())
  updated_at DateTime? @updatedAt

  @@map("notifications")
}

model NotificationPreferences {
  id                 Int     @id @default(autoincrement())
  user_id            String  @db.Uuid  @unique
  is_enabled         Boolean? @default(true)
  is_email_enabled   Boolean? @default(true)
  is_sms_enabled     Boolean? @default(true)
  is_push_enabled    Boolean? @default(true)

  user       User    @relation(fields: [user_id], references: [id], onDelete: Cascade)

  created_at DateTime  @default(now())
  updated_at DateTime? @updatedAt

  @@map("notification_preferences")
  @@index([user_id])
}

After adding the models, run your database migration:

make migrate
  • Notifications: The main table that stores the content of each notification and links it to a user.

  • NotificationPreferences: A separate table to store user-specific settings, allowing them to control which notifications they receive.

Step 2: Creating the In-Memory Queue (The ConnectionManager)

This is the heart of our lightweight system. Instead of an external message broker, we'll use a Python class to manage active connections in memory. But before we go on with the code, let us discuss a crucial architectural choice that was made at this stage: Push vs Pull.

The Inefficient Way: Polling (Pull)

A simple approach would be to create an SSE endpoint that queries the database for new notifications every few seconds. This is called polling. The client repeatedly "pulls" data from the server. While easy to implement, it's very inefficient. It puts a constant, unnecessary load on your database and isn't truly real-time—there's always a delay.

The Efficient Way: Real-Time Push

A much better approach is a "push" system. The client opens a single, persistent connection and just waits. When a new notification is created anywhere in our application, our server pushes that single new event down the connection to the client. This is instantaneous, highly efficient, and puts zero polling load on our database.

The ConnectionManager class is the tool that enables this push-based architecture without needing a heavy message broker. Let's build it.

In your app directory, create a notifications module, and inside it, a file named manager.py .

# app/notifications/manager.py
import asyncio 
from typing import Dict, List
import logging

logger = logging.getLogger(__name__)


class ConnectionManager:
     """Manages active user connections for real-time notifications."""
    def __init__(self):
        # A dictionary where keys are user_ids and values are a list of asyncio Queues.
        # A list is used because a single user can have multiple browser tabs open.
        self.active_connections: Dict[str, List[asyncio.Queue]] = {}

    async def connect(self, user_id: str) -> asyncio.Queue:
        """A client connects. A new queue is created and returned for this connection."""
        queue = asyncio.Queue()
        if user_id not in self.active_connections:
            self.active_connections[user_id] = []
        self.active_connections[user_id].append(queue)
        logger.info(f"User {user_id} connected. Total connections for user: {len(self.active_connections[user_id])}")
        return queue

    def disconnect(self, user_id: str, queue: asyncio.Queue):
         """A client disconnects. The specific queue is removed from the user's list."""
        if user_id in self.active_connections:
            self.active_connections[user_id].remove(queue)
            if not self.active_connections[user_id]:
                del self.active_connections[user_id]
                logger.info(f"User {user_id} disconnected from notifications.")

    async def push_notification(self, message: str, user_id: str):
        """Pushes a message to all active connections for a specific user."""
        if user_id in self.active_connections:
            for queue in self.active_connections[user_id]:
                await queue.put(message)

    async def push_broadcast(self, message: str):
        """Pushes a message to every single active connection."""
        for user_queues in self.active_connections.values():
            for queue in user_queues:
                await queue.put(message)

How it works:

  • connect: When a user opens a new tab, they connect to our stream. We create a unique asyncio.Queue for that specific connection and store it.

  • disconnect: When the user closes the tab, we clean up by removing their queue.

  • push_notification: This is how we send a targeted notification. It finds all active connections for a single user and puts the message in their queues.

Step 3: Defining the API Endpoints

Now, let's create the API routes. In your notifications module, create a file named routes.py .

# app/notifications/routes.py
import asyncio
import json
import logging
from datetime import datetime, timezone

from fastapi import APIRouter, Request
from sse_starlette.sse import EventSourceResponse

from app.db_config.prismaClient import prisma
from app.notifications.core.model import NotificationSchema, NotificationType
from app.notifications.core.manager import ConnectionManager

logger = logging.getLogger(__name__)

# Core Notification Event Handler and Emitter
router = APIRouter(prefix="/notifications", tags=["notifications"])

manager = ConnectionManager() # Create a single, shared instance

# SSE Endpoint for Real-time Push Notifications
@router.get("/stream/{user_id}")
async def stream_notifications(request: Request, user_id: str):
    """"
    Generator function to yield notification events for a specific user.
    This function simulates real-time notification delivery using Server-Sent Events (SSE).
    The user is connected to a queue managed by the ConnectionManager
    """
    queue = await manager.connect(user_id)
    async def event_generator():
        try:
            while True:
                # The 'await queue.get()' line will pause here, waiting
                # patiently until a message is put into the queue.
                notification_data = await queue.get()
                if await request.is_disconnected():
                    break
                yield {
                    "event": "new_notification",
                    "data": notification_data,
                } 
        except asyncio.CancelledError:
            logger.info(f"Client {user_id} disconnected, stopping event generator.")
        except Exception as e:
            logger.error(f"Error in event generator: {e}")
            yield {
                "event": "error",
                "data": json.dumps({"error": str(e)}),
            }
        finally:
            manager.disconnect(user_id, queue)
            logger.info(f"Cleaned up connection for user {user_id}")

    return EventSourceResponse(event_generator())


# Getting all notifications for a user
@router.get("/all/{user_id}")
async def get_all_notifications(user_id: str):
    try:
        notifications = await prisma.notifications.find_many(
            where={"user_id": user_id},
            order={"sent_at": "desc"},
        )
        return get_response(
            status_code=StatusCode.OK,
            message="Notifications fetched successfully",
            data=[json.loads(notif.model_dump_json()) for notif in notifications],
        )
    except Exception as e:
        logger.error(f"Error fetching notifications for user {user_id}: {e}")


# Update notification as read
@router.patch("/read/{notification_id}")
async def read_notification(notification_id: int):
    try:
        notification = await prisma.notifications.update(
            where={"id": notification_id},
            data={"is_read": True, "read_at": datetime.now(timezone.utc)},
        )
        logger.info(f"Notification {notification_id} marked as read.")
        return get_response(
            status_code=StatusCode.OK,
            message="Notification marked as read",
            data=json.loads(notification.model_dump_json()),
        )
    except Exception as e:
        logger.error(f"Error marking notification {notification_id} as read: {e}")
        return get_response(
            status_code=StatusCode.INTERNAL_SERVER_ERROR,
            message=f"Failed to mark notification as read: {str(e)}",
        )

The Three Key Endpoints:

  1. /stream/{user_id}: The real-time SSE endpoint. A client connects here and just waits. It does not touch the database.

  2. /all/{user_id}: A standard REST endpoint that the frontend calls once on page load to get the user's notification history.

  3. /read/{notification_id}: A standard REST endpoint to update the is_read status when a user clicks a notification.

Step 4: Emitting Notifications from Anywhere

The beauty of this system is its decoupling. We can now create and send a notification from anywhere in our application.

Let's create a helper function. In your notifications module, create emitter.py:

# app/notifications/emitter.py
import json
import logging
from datetime import datetime, timezone
from app.settings.prisma_client import prisma
from app.notifications.manager import manager # Import the shared manager instance
from pydantic import BaseModel, EmailStr
from enum import Enum

logger = logging.getLogger(__name__)

class NotificationType(str, Enum):
    ALERT = "ALERT"
    REMINDER = "REMINDER"
    # ---- Add other types as needed -----

class NotificationSchema(BaseModel):
    user_id: Optional[str]
    title: Optional[str]
    content: Optional[str]
    type: Optional[NotificationType] = NotificationType.ALERT


async def emit_notification_event(data: NotificationSchema):
    """
    Creates a notification in the DB and pushes it to the live user connection.
    """
    try:
        # 1. Create the record in the database
        data = data.model_dump()
        notification = await prisma.notifications.create(
            data={
                "user_id": data.get("user_id"),
                "title": data.get("title"),
                "content": data.get("content"),
                "type": data.get("type").value if data.get("type") else NotificationType.ALERT.value,
                "is_read": False,
                "sent_at": datetime.now(timezone.utc),
            }
        )
        logger.info(f"Notification created with ID: {notification.id}")

        # 2. Convert the new notification to a JSON string
        notification_dict = notification.model_dump()
        notification_json = json.dumps(notification_dict, default=str)

        # 3. Push the JSON to the user's live connection que
        if data.get("user_id"):
            await manager.push_notification(notification_json, data.get("user_id"))
        else:
            logger.warning("No user_id provided; skipping push to user queue."
        return True
    except Exception as e:
        logger.error(f"Error creating notification: {e}")
        return False

Now, you can import and call this emit_notification function from any part of your application. For example, in your user login logic:

Now that we are done, we just include the logic where it should be.

from app.notifications.core.emit import emit_notification_event
from app.notifications.core.models import NotificationSchema, NotificationType


async def login_user(...):
   # --- successful login logic here ---

   # After successful login, emit a notification
   await emit_notification_event(
           NotificationSchema(
              user_id=<user_id>,
              title="Login Successful",
              content="You have successfully logged in.",
              type=NotificationType.ALERT,
            )
          )
   return JSONResponse(...)

Video Playout

%[Watch the live demo of the notification system](https://youtu.be/WCMw7Cd7I08)

Conclusion

We've successfully built a powerful, real-time, and non-blocking notification system without adding any external dependencies like Redis or RabbitMQ. By combining FastAPI's async capabilities with a simple ConnectionManager, we created a lightweight pub/sub system that is perfect for projects in their early stages.

The "Fetch-then-Listen" pattern, where the client fetches historical data via a standard REST API and then opens an SSE connection for live updates, is a robust and scalable approach for building modern real-time UIs.

Thank you for reading! If you have any questions or additions, please leave your comments below.

References

0
Subscribe to my newsletter

Read articles from Onuh Chidera Theola directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Onuh Chidera Theola
Onuh Chidera Theola

Hello 👋🏾 I’m a highly motivated software developer from Nigeria, who enjoys coming up with solutions and ideas for new projects. I also enjoy reading a lot, like books, comics/manga,manhwa. Feel free to connect with me anytime☺️