📡 FastAPI MCP SSE Server with JWT Auth & Custom Client


đź“– Introduction
In modern AI applications, communication between clients and tools isn’t always as simple as calling an API. The Model Context Protocol (MCP) provides a standardized way for clients to exchange information, invoke tools, and maintain shared context over persistent connections — typically via Server-Sent Events (SSE).
In this post, I’ll walk you through:
Building an MCP SSE server using FastAPI
Securing it with JWT authentication
Implementing a custom Python client to connect, authenticate, and use MCP tools
We’ll also build a simpleBMI Calculator
tool to demo tool calling through MCP.
🎛️ Project Overview
We’ll build:
A FastAPI server that exposes an MCP-compliant SSE endpoint
A token-based auth system
Simple tools to get weather and time for a given location
A Python client that authenticates, connects via SSE, and invokes the tool dynamically
📦 Tech Stack
Python 3.11+
Python MCP SDK
FastAPI
aiohttp
PyJWT
Pydantic
loguru (for clean logs)
Setting up the MCP SSE Server (server.py)
Let’s first import the relevant libraries and get credentials and keys from env variables
import datetime
import os
from zoneinfo import ZoneInfo
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import requests
from starlette.applications import Starlette
from starlette.routing import Route, Mount
import jwt
from mcp.server.fastmcp import FastMCP
from mcp.server.sse import SseServerTransport
from loguru import logger
from dotenv import load_dotenv
load_dotenv()
Let’s initialize the MCP server and setup the tools
# Initialize the MCP server with your tools
mcp = FastMCP(
name="Weather and Time SSE Server"
)
@mcp.tool()
def TimeTool(input_timezone):
"Provides the current time for a given city's timezone like Asia/Kolkata, America/New_York etc. If no timezone is provided, it returns the local time."
format = "%Y-%m-%d %H:%M:%S %Z%z"
current_time = datetime.datetime.now()
if input_timezone:
print("TimeZone", input_timezone)
current_time = current_time.astimezone(ZoneInfo(input_timezone))
return f"The current time is {current_time}."
transport = SseServerTransport("/messages/")
@mcp.tool()
def weather_tool(location: str):
"""Provides weather information for a given location"""
api_key = os.getenv("OPENWEATHERMAP_API_KEY")
url = f"http://api.openweathermap.org/data/2.5/weather?q={location}&appid={api_key}&units=metric"
response = requests.get(url)
data = response.json()
if data["cod"] == 200:
temp = data["main"]["temp"]
description = data["weather"][0]["description"]
return f"The weather in {location} is currently {description} with a temperature of {temp}°C."
else:
return f"Sorry, I couldn't find weather information for {location}."
We will now setup the JWT auth system and use the starlette routes to essentially expose the app to the clients.
SECRET_KEY = "my_super_secret_key"
ALGORITHM = "HS256"
def check_auth(request: Request):
auth = request.headers.get("authorization", "")
if auth.startswith("Bearer "):
token = auth.split(" ", 1)[1]
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return True
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
raise HTTPException(status_code=401, detail="Unauthorized")
async def handle_sse(request):
check_auth(request=request)
# Prepare bidirectional streams over SSE
async with transport.connect_sse(
request.scope,
request.receive,
request._send
) as (in_stream, out_stream):
# Run the MCP server: read JSON-RPC from in_stream, write replies to out_stream
await mcp._mcp_server.run(
in_stream,
out_stream,
mcp._mcp_server.create_initialization_options()
)
#Build a small Starlette app for the two MCP endpoints
sse_app = Starlette(
routes=[
Route("/sse", handle_sse, methods=["GET"]),
# Note the trailing slash to avoid 307 redirects
Mount("/messages/", app=transport.handle_post_message)
]
)
Now let’s setup a FastAPI app and add a route for the clients to create a token. We will create a mock client store to simulate credential manager.
app = FastAPI()
# Mock client store
CLIENTS = {
"test_client": "secret_1234"
}
class TokenRequest(BaseModel):
client_id: str
client_secret: str
@app.post("/token")
def generate_token(request: TokenRequest):
if request.client_id in CLIENTS and CLIENTS[request.client_id] == request.client_secret:
payload = {
"sub": request.client_id,
"exp": datetime.datetime.now() + datetime.timedelta(minutes=60)
}
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
return {"access_token": token}
else:
raise HTTPException(status_code=401, detail="Invalid credentials")
Mount the starlette app to FastAPI and setup gunicorn server to run the app.
app.mount("/", sse_app)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8100)
đź”’ Why JWT-Based Auth?
Using client_id / client_secret via POST is scalable:
You can rotate credentials
Enforce expiration
Integrate external OAuth2/OIDC providers
Track session-level auth
Setting up the MCP SSE Client (client.py)
Setting up client is similar to what we have seen previously in this series. The only change being we generate the token and pass that as headers in the SSE Client.
import asyncio
import json
from typing import Optional
from mcp import ClientSession
from mcp.client.sse import sse_client
from openai import OpenAI
import mcp.client.sse as _sse_mod
from httpx import AsyncClient as _BaseAsyncClient
from loguru import logger
import aiohttp
from dotenv import load_dotenv
load_dotenv() # load environment variables from .env
import httpx
_orig_request = httpx.AsyncClient.request
async def _patched_request(self, method, url, *args, **kwargs):
# ensure follow_redirects is set so 307 → /messages/ works
kwargs.setdefault("follow_redirects", True)
return await _orig_request(self, method, url, *args, **kwargs)
httpx.AsyncClient.request = _patched_request
def llm_client(message: str):
client = OpenAI()
completion = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are an intelligent Assistant. You will execute tasks as instructed"},
{
"role": "user",
"content": message,
},
],
)
result = completion.choices[0].message.content
return result
def get_prompt_to_identify_tool_and_arguements(query, tools):
tools_description = "\n".join([f"{tool.name}: {tool.description}, {tool.inputSchema}" for tool in tools.tools])
return ("You are a helpful assistant with access to these tools:\n\n"
f"{tools_description}\n"
"Choose the appropriate tool based on the user's question. \n"
f"User's Question: {query}\n"
"If no tool is needed, reply directly.\n\n"
"IMPORTANT: When you need to use a tool, you must ONLY respond with "
"the exact JSON object format below, nothing else:\n"
"Keep the values in str "
"{\n"
' "tool": "tool-name",\n'
' "arguments": {\n'
' "argument-name": "value"\n'
" }\n"
"}\n\n")
TOKEN_URL = "http://localhost:8100/token"
SSE_URL = "http://localhost:8100/sse"
async def get_token():
payload = {"client_id": "test_client", "client_secret": "secret_1234"}
async with aiohttp.ClientSession() as session:
async with session.post(TOKEN_URL, json=payload) as resp:
if resp.status != 200:
logger.error(f"Failed to get token: {resp.status}")
raise Exception("Unable to authenticate. Ensure you are using valid credentials")
data = await resp.json()
logger.info("Successfully generated token")
return data["access_token"]
async def main(query:str):
try:
auth_token = await get_token()
headers = {"Authorization": f"Bearer {auth_token}"}
async with sse_client(url=SSE_URL,headers=headers) as (in_stream, out_stream):
# 2) Create an MCP session over those streams
async with ClientSession(in_stream, out_stream) as session:
# 3) Initialize
info = await session.initialize()
logger.info(f"Connected to {info.serverInfo.name} v{info.serverInfo.version}")
# 4) List tools
tools = (await session.list_tools())
logger.info(tools)
prompt = get_prompt_to_identify_tool_and_arguements(query,tools)
logger.info(f"Printing Prompt \n {prompt}")
response = llm_client(prompt)
print(response)
tool_call = json.loads(response)
result = await session.call_tool(tool_call["tool"], arguments=tool_call["arguments"])
logger.success(f"User query: {query}, Tool Response: {result.content[0].text}")
except Exception as e:
print(f"Encountered error: {e}")
if __name__ == "__main__":
queries = ["What is the time in Bengaluru?", "What is the weather like right now in Dubai?"]
for query in queries:
asyncio.run(main(query))
Response when the client is run.
2025-05-18 18:00:48.230 | SUCCESS | __main__:main:103 - User query: What is the weather like right now in Dubai?, Tool Response: The weather in Dubai is currently clear sky with a temperature of 30.37°C.
Subscribe to my newsletter
Read articles from Zahiruddin Tavargere directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Zahiruddin Tavargere
Zahiruddin Tavargere
I am a Journalist-turned-Software Engineer. I love coding and the associated grind of learning every day. A firm believer in social learning, I owe my dev career to all the tech content creators I have learned from. This is my contribution back to the community.