Create your own MCP server and client based on transport protocols.

Raghu Charan VRaghu Charan V
7 min read

MCP (Model Context Protocol) It is based on client-server architecture. It supports three types of transport protocols. A transport is a communication layer between client and server. It handles the messages between the sending end and the receiving end.

  1. SSE (Server Side Events)

  2. STDIO (standard input and output)

  3. Streamable HTTP

Studio Transport Protocol

Studio Transport is MCP's standard HTTP-based protocol designed for production environments and IDE integrations. It supports standard input and output streams, which are known for local file handling. If the server has its file locally, which is sharable to the client, we use STDIO transport. It is particularly useful for command-line end and for local file servers.

Key Characteristics of Studio Transport:

  1. Production-Ready: Designed for stable, production deployments

  2. IDE Integration: Primary transport for code editors and development tools

  3. Synchronous Operations: Traditional request-response pattern

  4. Robust Error Handling: Comprehensive error reporting and recovery

  5. Authentication Support: Built-in support for API keys, OAuth, etc.

  6. Caching Layer: Intelligent caching for frequently accessed resources

  7. Rate Limiting: Built-in protection against abuse

Studio Transport Features:

  • Connection Pooling: Efficient HTTP connection management

  • Retry Logic: Automatic retry with exponential backoff

  • Compression: Gzip/deflate support for reduced bandwidth

  • Timeout Management: Configurable timeouts for different operations

  • Monitoring Integration: Built-in metrics and logging

  • Security: TLS/SSL encryption, certificate validation

Studio Transport Use Cases:

  • VS Code Extensions: MCP tools in development environments

  • CI/CD Pipelines: Automated testing and deployment

  • Enterprise Applications: Large-scale, multi-user systems

  • API Gateways: Backend service integration

  • Microservices: Service-to-service communication

This is an example of a studio transport layer with fastMCP as the server and langchain-mcp-adapters as the client.


# MCP Server using FastMCP
# Install: pip install fastmcp langchain-mcp-adapters
#file name mcp_server.py
from fastmcp import FastMCP
from typing import Dict, Any
import asyncio
import json

# Initialize FastMCP server
server = FastMCP("Simple MCP Server")

# Define a simple tool/function
@server.tool()
def get_weather(location: str) -> Dict[str, Any]:
    """Get weather information for a location"""
    # Mock weather data
    weather_data = {
        "location": location,
        "temperature": "22°C",
        "condition": "Sunny",
        "humidity": "65%"
    }
    return weather_data

@server.tool()
def calculate_sum(a: int, b: int) -> int:
    """Calculate the sum of two numbers"""
    return a + b

Client for the above server

# Create server parameters for stdio connection
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent

server_params = StdioServerParameters(
    command="python",
    # Make sure to update to the full absolute path to your math_server.py file
    args=["/path/to/mcp_server.py"],
)

async with stdio_client(server_params) as (read, write):
    async with ClientSession(read, write) as session:
        # Initialize the connection
        await session.initialize()

        # Get tools
        tools = await load_mcp_tools(session)

        # Create and run the agent
        agent = create_react_agent("openai:gpt-4.1", tools)
        agent_response = await agent.ainvoke({"messages": "what's (3 + 5) x 12?"})

Streamable HTTP Transport Protocol

Streamable HTTP is MCP's enhanced HTTP protocol that supports bidirectional streaming while maintaining HTTP compatibility. It bridges the gap between traditional HTTP and real-time protocols.It supports multi-server configuration from the client side. And we can extend a direct connection from any server using this transport protocol.

Key Characteristics of Streamable HTTP:

  1. Bidirectional Streaming: Both client and server can stream data

  2. HTTP Compatible: Works with existing HTTP infrastructure

  3. Progressive Responses: Partial results before completion

  4. Backpressure Handling: Flow control for large data streams

  5. Multiplexing: Multiple concurrent streams over single connection

  6. Chunked Transfer: Efficient handling of large payloads

Streamable HTTP Features:

  • Progressive Loading: Results appear as they're generated

  • Stream Cancellation: Ability to cancel long-running operations

  • Flow Control: Prevent overwhelming slow clients

  • Compression: Per-stream compression for efficiency

  • Error Recovery: Graceful handling of partial failures

  • Resource Efficiency: Lower memory usage for large datasets

This is an example of the Streamable-HTTP transport protocol, with the server as Fastmcp and the client as langchain-mcp-adapters. A multi-server configuration was used from the client side to expose the server.

# task_server.py
from typing import List, Dict, Any
from mcp.server.fastmcp import FastMCP
from datetime import datetime
import json

mcp = FastMCP("TaskManager")

# In-memory task storage
tasks_db = []
task_counter = 0

@mcp.tool()
async def create_task(title: str, description: str = "", priority: str = "medium") -> Dict[str, Any]:
    """Create a new task with title, description, and priority."""
    global task_counter
    task_counter += 1

    task = {
        "id": task_counter,
        "title": title,
        "description": description,
        "priority": priority,
        "status": "pending",
        "created_at": datetime.now().isoformat(),
        "completed_at": None
    }

    tasks_db.append(task)
    return {"success": True, "task": task, "message": f"Task '{title}' created successfully"}

@mcp.tool()
async def get_tasks(status: str = "all") -> List[Dict[str, Any]]:
    """Get all tasks or filter by status (pending, completed, all)."""
    if status == "all":
        return tasks_db
    else:
        return [task for task in tasks_db if task["status"] == status]

@mcp.tool()
async def complete_task(task_id: int) -> Dict[str, Any]:
    """Mark a task as completed by its ID."""
    for task in tasks_db:
        if task["id"] == task_id:
            task["status"] = "completed"
            task["completed_at"] = datetime.now().isoformat()
            return {"success": True, "message": f"Task {task_id} marked as completed"}

    return {"success": False, "message": f"Task with ID {task_id} not found"}

@mcp.tool()
async def delete_task(task_id: int) -> Dict[str, Any]:
    """Delete a task by its ID."""
    global tasks_db
    original_length = len(tasks_db)
    tasks_db = [task for task in tasks_db if task["id"] != task_id]

    if len(tasks_db) < original_length:
        return {"success": True, "message": f"Task {task_id} deleted successfully"}
    else:
        return {"success": False, "message": f"Task with ID {task_id} not found"}

@mcp.tool()
async def get_task_stats() -> Dict[str, Any]:
    """Get statistics about tasks."""
    total_tasks = len(tasks_db)
    completed_tasks = len([task for task in tasks_db if task["status"] == "completed"])
    pending_tasks = total_tasks - completed_tasks

    priority_counts = {}
    for task in tasks_db:
        priority = task["priority"]
        priority_counts[priority] = priority_counts.get(priority, 0) + 1

    return {
        "total_tasks": total_tasks,
        "completed_tasks": completed_tasks,
        "pending_tasks": pending_tasks,
        "priority_breakdown": priority_counts
    }

if __name__ == "__main__":
    mcp.run(transport="streamable-http")

client endpoint

"""
Simplified LangGraph MCP Client - Just Task Manager
"""

import asyncio
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI

async def simple_example():
    # Initialize client with just the task manager
    client = MultiServerMCPClient(
        {
            "task_manager": {
                "url": "http://localhost:8000/",
                "transport": "streamable_http",
            }
        }
    )

    try:
        # Get tools from task manager server
        tools = await client.get_tools()
        print(f"Available tools: {[tool.name for tool in tools]}")

        # Create agent (you need to set OPENAI_API_KEY environment variable)
        llm = ChatOpenAI(model="gpt-4", temperature=0)
        agent = create_react_agent(llm, tools)

        # Simple task management example
        response1 = await agent.ainvoke({
            "messages": "Create a task called 'Fix bug in login system' with high priority"
        })
        print(f"Response 1: {response1['messages'][-1].content}")

        response2 = await agent.ainvoke({
            "messages": "Show me all tasks and their current status"
        })
        print(f"Response 2: {response2['messages'][-1].content}")

    except Exception as e:
        print(f"Error: {e}")
    finally:
        await client.close()

if __name__ == "__main__":
    asyncio.run(simple_example())

SSE Protocol:

SSE (Server-Sent Events) is a real-time communication protocol that enables unidirectional streaming from MCP servers to clients. Unlike traditional request-response patterns, SSE maintains a persistent connection where the server can continuously push updates to connected clients.

Key Advantages of SSE over Streamable HTTP:

  1. Direct Local Server URLs: SSE allows direct connection to local development servers without complex proxy configurations

  2. Real-time Updates: Immediate notification of server-side changes (task updates, inventory changes, etc.)

  3. Persistent Connection: Maintains long-lived connections for continuous data flow

  4. Lower Latency: No need for client polling - server pushes updates instantly

  5. Built-in Reconnection: Automatic reconnection handling when connections drop

  6. Event Ordering: Guaranteed event sequence with unique event IDs

SSE Protocol Features in MCP:

  1. Event Streaming: Real-time notifications for data changes

  2. Event Types: Different categories of events (task_created, task_completed, etc.)

  3. Event History: Server maintains recent event history for late-joining clients

  4. Automatic Reconnection: Built-in retry mechanism for dropped connections

  5. Event Filtering: Clients can subscribe to specific event types

  6. Cross-Origin Support: CORS handling for web-based clients

Use Cases Where SSE Excels:

  • Live Dashboards: Real-time task/inventory/metrics updates

  • Collaborative Applications: Multiple users seeing live changes

  • Monitoring Systems: Continuous system health updates

  • Chat Applications: Live message streaming

  • Progress Tracking: Real-time progress updates for long-running tasks

  • Notification Systems: Instant alerts and notifications

These are transport protocols that are enabled in mcp configuration to your LLM, also It is supported with typescript to develop similar configuration.

Thanks for reading, hope this helpful.

0
Subscribe to my newsletter

Read articles from Raghu Charan V directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Raghu Charan V
Raghu Charan V

A Professional AI Engineer, developing Agentic Workflows and Autonomous agents in Action.