Create your own MCP server and client based on transport protocols.

MCP (Model Context Protocol) It is based on client-server architecture. It supports three types of transport protocols. A transport is a communication layer between client and server. It handles the messages between the sending end and the receiving end.
SSE (Server Side Events)
STDIO (standard input and output)
Streamable HTTP
Studio Transport Protocol
Studio Transport is MCP's standard HTTP-based protocol designed for production environments and IDE integrations. It supports standard input and output streams, which are known for local file handling. If the server has its file locally, which is sharable to the client, we use STDIO transport. It is particularly useful for command-line end and for local file servers.
Key Characteristics of Studio Transport:
Production-Ready: Designed for stable, production deployments
IDE Integration: Primary transport for code editors and development tools
Synchronous Operations: Traditional request-response pattern
Robust Error Handling: Comprehensive error reporting and recovery
Authentication Support: Built-in support for API keys, OAuth, etc.
Caching Layer: Intelligent caching for frequently accessed resources
Rate Limiting: Built-in protection against abuse
Studio Transport Features:
Connection Pooling: Efficient HTTP connection management
Retry Logic: Automatic retry with exponential backoff
Compression: Gzip/deflate support for reduced bandwidth
Timeout Management: Configurable timeouts for different operations
Monitoring Integration: Built-in metrics and logging
Security: TLS/SSL encryption, certificate validation
Studio Transport Use Cases:
VS Code Extensions: MCP tools in development environments
CI/CD Pipelines: Automated testing and deployment
Enterprise Applications: Large-scale, multi-user systems
API Gateways: Backend service integration
Microservices: Service-to-service communication
This is an example of a studio transport layer with fastMCP as the server and langchain-mcp-adapters as the client.
# MCP Server using FastMCP
# Install: pip install fastmcp langchain-mcp-adapters
#file name mcp_server.py
from fastmcp import FastMCP
from typing import Dict, Any
import asyncio
import json
# Initialize FastMCP server
server = FastMCP("Simple MCP Server")
# Define a simple tool/function
@server.tool()
def get_weather(location: str) -> Dict[str, Any]:
"""Get weather information for a location"""
# Mock weather data
weather_data = {
"location": location,
"temperature": "22°C",
"condition": "Sunny",
"humidity": "65%"
}
return weather_data
@server.tool()
def calculate_sum(a: int, b: int) -> int:
"""Calculate the sum of two numbers"""
return a + b
Client for the above server
# Create server parameters for stdio connection
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent
server_params = StdioServerParameters(
command="python",
# Make sure to update to the full absolute path to your math_server.py file
args=["/path/to/mcp_server.py"],
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# Initialize the connection
await session.initialize()
# Get tools
tools = await load_mcp_tools(session)
# Create and run the agent
agent = create_react_agent("openai:gpt-4.1", tools)
agent_response = await agent.ainvoke({"messages": "what's (3 + 5) x 12?"})
Streamable HTTP Transport Protocol
Streamable HTTP is MCP's enhanced HTTP protocol that supports bidirectional streaming while maintaining HTTP compatibility. It bridges the gap between traditional HTTP and real-time protocols.It supports multi-server configuration from the client side. And we can extend a direct connection from any server using this transport protocol.
Key Characteristics of Streamable HTTP:
Bidirectional Streaming: Both client and server can stream data
HTTP Compatible: Works with existing HTTP infrastructure
Progressive Responses: Partial results before completion
Backpressure Handling: Flow control for large data streams
Multiplexing: Multiple concurrent streams over single connection
Chunked Transfer: Efficient handling of large payloads
Streamable HTTP Features:
Progressive Loading: Results appear as they're generated
Stream Cancellation: Ability to cancel long-running operations
Flow Control: Prevent overwhelming slow clients
Compression: Per-stream compression for efficiency
Error Recovery: Graceful handling of partial failures
Resource Efficiency: Lower memory usage for large datasets
This is an example of the Streamable-HTTP transport protocol, with the server as Fastmcp and the client as langchain-mcp-adapters. A multi-server configuration was used from the client side to expose the server.
# task_server.py
from typing import List, Dict, Any
from mcp.server.fastmcp import FastMCP
from datetime import datetime
import json
mcp = FastMCP("TaskManager")
# In-memory task storage
tasks_db = []
task_counter = 0
@mcp.tool()
async def create_task(title: str, description: str = "", priority: str = "medium") -> Dict[str, Any]:
"""Create a new task with title, description, and priority."""
global task_counter
task_counter += 1
task = {
"id": task_counter,
"title": title,
"description": description,
"priority": priority,
"status": "pending",
"created_at": datetime.now().isoformat(),
"completed_at": None
}
tasks_db.append(task)
return {"success": True, "task": task, "message": f"Task '{title}' created successfully"}
@mcp.tool()
async def get_tasks(status: str = "all") -> List[Dict[str, Any]]:
"""Get all tasks or filter by status (pending, completed, all)."""
if status == "all":
return tasks_db
else:
return [task for task in tasks_db if task["status"] == status]
@mcp.tool()
async def complete_task(task_id: int) -> Dict[str, Any]:
"""Mark a task as completed by its ID."""
for task in tasks_db:
if task["id"] == task_id:
task["status"] = "completed"
task["completed_at"] = datetime.now().isoformat()
return {"success": True, "message": f"Task {task_id} marked as completed"}
return {"success": False, "message": f"Task with ID {task_id} not found"}
@mcp.tool()
async def delete_task(task_id: int) -> Dict[str, Any]:
"""Delete a task by its ID."""
global tasks_db
original_length = len(tasks_db)
tasks_db = [task for task in tasks_db if task["id"] != task_id]
if len(tasks_db) < original_length:
return {"success": True, "message": f"Task {task_id} deleted successfully"}
else:
return {"success": False, "message": f"Task with ID {task_id} not found"}
@mcp.tool()
async def get_task_stats() -> Dict[str, Any]:
"""Get statistics about tasks."""
total_tasks = len(tasks_db)
completed_tasks = len([task for task in tasks_db if task["status"] == "completed"])
pending_tasks = total_tasks - completed_tasks
priority_counts = {}
for task in tasks_db:
priority = task["priority"]
priority_counts[priority] = priority_counts.get(priority, 0) + 1
return {
"total_tasks": total_tasks,
"completed_tasks": completed_tasks,
"pending_tasks": pending_tasks,
"priority_breakdown": priority_counts
}
if __name__ == "__main__":
mcp.run(transport="streamable-http")
client endpoint
"""
Simplified LangGraph MCP Client - Just Task Manager
"""
import asyncio
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
async def simple_example():
# Initialize client with just the task manager
client = MultiServerMCPClient(
{
"task_manager": {
"url": "http://localhost:8000/",
"transport": "streamable_http",
}
}
)
try:
# Get tools from task manager server
tools = await client.get_tools()
print(f"Available tools: {[tool.name for tool in tools]}")
# Create agent (you need to set OPENAI_API_KEY environment variable)
llm = ChatOpenAI(model="gpt-4", temperature=0)
agent = create_react_agent(llm, tools)
# Simple task management example
response1 = await agent.ainvoke({
"messages": "Create a task called 'Fix bug in login system' with high priority"
})
print(f"Response 1: {response1['messages'][-1].content}")
response2 = await agent.ainvoke({
"messages": "Show me all tasks and their current status"
})
print(f"Response 2: {response2['messages'][-1].content}")
except Exception as e:
print(f"Error: {e}")
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(simple_example())
SSE Protocol:
SSE (Server-Sent Events) is a real-time communication protocol that enables unidirectional streaming from MCP servers to clients. Unlike traditional request-response patterns, SSE maintains a persistent connection where the server can continuously push updates to connected clients.
Key Advantages of SSE over Streamable HTTP:
Direct Local Server URLs: SSE allows direct connection to local development servers without complex proxy configurations
Real-time Updates: Immediate notification of server-side changes (task updates, inventory changes, etc.)
Persistent Connection: Maintains long-lived connections for continuous data flow
Lower Latency: No need for client polling - server pushes updates instantly
Built-in Reconnection: Automatic reconnection handling when connections drop
Event Ordering: Guaranteed event sequence with unique event IDs
SSE Protocol Features in MCP:
Event Streaming: Real-time notifications for data changes
Event Types: Different categories of events (task_created, task_completed, etc.)
Event History: Server maintains recent event history for late-joining clients
Automatic Reconnection: Built-in retry mechanism for dropped connections
Event Filtering: Clients can subscribe to specific event types
Cross-Origin Support: CORS handling for web-based clients
Use Cases Where SSE Excels:
Live Dashboards: Real-time task/inventory/metrics updates
Collaborative Applications: Multiple users seeing live changes
Monitoring Systems: Continuous system health updates
Chat Applications: Live message streaming
Progress Tracking: Real-time progress updates for long-running tasks
Notification Systems: Instant alerts and notifications
These are transport protocols that are enabled in mcp configuration to your LLM, also It is supported with typescript to develop similar configuration.
Thanks for reading, hope this helpful.
Subscribe to my newsletter
Read articles from Raghu Charan V directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Raghu Charan V
Raghu Charan V
A Professional AI Engineer, developing Agentic Workflows and Autonomous agents in Action.