Building MCP Servers: Custom Tool Integrations for AI Agents
A comprehensive guide to building Model Context Protocol (MCP) servers—from basic tool exposure to production-grade integrations with authentication, streaming, and error handling.
Table of Contents
The Universal Tool Interface
Model Context Protocol (MCP) has become the de-facto standard for connecting AI models to external tools and data sources. If you're building AI applications with Claude Code, Cursor, Windsurf, or any MCP-compatible client, understanding how to build custom MCP servers unlocks powerful integrations.
The MCP revolution of 2024-2025: Introduced by Anthropic in November 2024 (source), MCP has achieved remarkable adoption in just over one year—97M+ monthly SDK downloads and backing from Anthropic, OpenAI, Google, and Microsoft. In December 2025, Anthropic donated MCP to the Agentic AI Foundation (AAIF) under the Linux Foundation, ensuring its future as an open standard.
Why MCP matters for production AI systems: Before MCP, every AI platform had its own tool integration format. Claude had one API, OpenAI another, local tools a third. MCP provides a universal interface—write your tool integration once, use it everywhere. This isn't just convenient; it's economically critical for enterprise adoption. Companies can build internal MCP servers for their APIs and instantly make them available to all their AI tools.
The security imperative (2025 updates): The June 2025 specification update significantly strengthened security requirements. Critical security considerations:
- OAuth-based authorization is now standard for production servers
- Resource Indicators (RFC 8707) are required to prevent token mis-redemption attacks
- Explicit user consent required before invoking any tool (hosts must implement this)
- CVE-2025-6514 warning: A shell command injection vulnerability in mcp-remote compromised 437,000+ developer environments. Never deploy MCP servers without authentication.
This guide takes you from basic MCP concepts to production-ready servers with authentication, streaming, resource management, and proper error handling.
Prerequisites:
- Familiarity with building agentic AI systems
- Basic understanding of MCP and A2A protocols
- Python or TypeScript experience
What you'll learn:
- MCP architecture and protocol mechanics
- Building tools, resources, and prompts
- Authentication and security patterns
- Streaming and long-running operations
- Testing and debugging MCP servers
- Production deployment strategies
MCP Architecture Overview
MCP follows a client-server architecture with three key components:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ MCP Host │ │ MCP Client │ │ MCP Server │
│ (Claude Code, │────▶│ (Protocol │────▶│ (Your custom │
│ Cursor, etc.) │ │ handler) │ │ integration) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
JSON-RPC 2.0
over stdio/HTTP
MCP Hosts are applications where the AI model runs (Claude Desktop, IDEs, custom apps). They manage server connections and present tool results to users.
MCP Clients handle protocol-level communication—sending requests, receiving responses, managing connection lifecycle.
MCP Servers expose capabilities to clients. A server can provide:
- Tools: Functions the model can invoke (search, calculate, query database)
- Resources: Data the model can read (files, database records, API responses)
- Prompts: Pre-defined templates that help models use tools effectively
Architectural principle from MCP best practices (source): Each MCP server should have one clear, well-defined purpose. Don't create a monolithic server that does everything—create focused servers for specific integrations. Examples:
- ✅ Good:
database-mcp-serverhandles only database queries - ✅ Good:
github-mcp-serverhandles only GitHub API calls - ❌ Bad:
company-tools-serverthat handles database, GitHub, Slack, email, and file system
Why focused servers matter: When the AI model lists available tools, it sees descriptions of every tool from every connected server. A server with 50 tools creates noise—the model struggles to find the right tool. A server with 5-10 focused tools is easy to navigate. Plus, focused servers can be updated and versioned independently.
The JSON-RPC 2.0 transport layer: MCP uses JSON-RPC 2.0 for communication. You don't typically implement this yourself—the SDK handles it. But understanding the underlying protocol helps with debugging:
// Request from client to server
{"jsonrpc": "2.0", "id": 1, "method": "tools/list"}
// Response from server to client
{"jsonrpc": "2.0", "id": 1, "result": {"tools": [...]}}
For local servers (Claude Desktop, IDEs), transport is over stdio (standard input/output). For remote servers, it's over HTTP with Server-Sent Events (SSE) for streaming. The SDK abstracts these differences—your tool code stays the same.
Setting Up Your Development Environment
Python Setup
# Create a new project
mkdir my-mcp-server && cd my-mcp-server
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install the MCP SDK
pip install mcp
TypeScript Setup
# Create a new project
mkdir my-mcp-server && cd my-mcp-server
npm init -y
# Install dependencies
npm install @modelcontextprotocol/sdk zod
npm install -D typescript @types/node tsx
Create tsconfig.json:
{
"compilerOptions": {
"target": "ES2022",
"module": "Node16",
"moduleResolution": "Node16",
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true
},
"include": ["src/**/*"]
}
Your First MCP Server
Let's build a simple server that exposes a calculator tool and a greeting resource. This example demonstrates the core MCP concepts: creating a server instance, defining tools with the @server.tool() decorator, exposing resources with URI templates, and running the server over stdio transport.
Understanding this foundational example is critical because every MCP server follows this same pattern—you create a server, register capabilities (tools, resources, prompts), and then run the server with a transport layer that handles the JSON-RPC communication.
Python Implementation
The Python MCP SDK provides a decorator-based API that makes server development intuitive. The @server.tool() decorator transforms an async function into an MCP tool, automatically generating the JSON schema from the function signature and docstring. The @server.resource() decorator works similarly for read-only data access.
Let's walk through each component:
# server.py
from mcp.server import Server
from mcp.types import Tool, TextContent, Resource
from mcp.server.stdio import stdio_server
import json
# Create the server
server = Server("calculator-server")
# Define a tool
@server.tool()
async def calculate(expression: str) -> list[TextContent]:
"""
Evaluate a mathematical expression.
Args:
expression: A mathematical expression like "2 + 2" or "sqrt(16)"
"""
import math
# Safe evaluation with math functions available
allowed_names = {
k: v for k, v in math.__dict__.items()
if not k.startswith("_")
}
allowed_names.update({"abs": abs, "round": round, "min": min, "max": max})
try:
result = eval(expression, {"__builtins__": {}}, allowed_names)
return [TextContent(
type="text",
text=json.dumps({"expression": expression, "result": result})
)]
except Exception as e:
return [TextContent(
type="text",
text=json.dumps({"error": str(e)})
)]
# Define a resource
@server.resource("greeting://{name}")
async def get_greeting(name: str) -> str:
"""Get a personalized greeting."""
return f"Hello, {name}! Welcome to the MCP server."
# Run the server
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Key concepts in this implementation:
-
Server instantiation (line 6): The
Server("calculator-server")creates a named server instance. The name appears in client logs and helps identify which server provided a response. -
Tool decorator (line 9):
@server.tool()registers the function as an MCP tool. The function's docstring becomes the tool description that the AI model reads to decide when to use it—write clear, specific docstrings. -
Return type (line 10): Tools must return
list[TextContent]. This wrapper ensures consistent response formatting across all tools. The content is always JSON-serialized text. -
Safe evaluation (lines 17-22): We create a restricted namespace with only math functions available. This prevents code injection—never use raw
eval()with untrusted input. In production, consider using a proper expression parser likenumexprorsympy. -
Resource decorator (line 30):
@server.resource("greeting://{name}")defines a URI template. The{name}placeholder becomes a function parameter. Resources are for read-only data; use tools for actions that have side effects. -
Stdio transport (lines 36-38):
stdio_server()creates a transport that reads JSON-RPC from stdin and writes to stdout. This is the standard transport for local MCP servers invoked by Claude Desktop or IDE extensions.
TypeScript Implementation
The TypeScript SDK takes a more explicit approach than Python. Instead of decorators, you register request handlers for specific protocol methods. This gives you fine-grained control over the protocol but requires more boilerplate. The trade-off is worth it for complex servers where you need custom routing or middleware.
The key difference from Python: TypeScript requires you to implement ListToolsRequestSchema (to advertise available tools) and CallToolRequestSchema (to handle invocations) separately. This separation follows the MCP protocol more closely and makes it easier to add features like dynamic tool registration.
// src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
ListResourcesRequestSchema,
ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { z } from "zod";
// Create server instance
const server = new Server(
{
name: "calculator-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
resources: {},
},
}
);
// Tool input schema
const CalculateSchema = z.object({
expression: z.string().describe("Mathematical expression to evaluate"),
});
// List available tools
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "calculate",
description: "Evaluate a mathematical expression. Supports basic arithmetic and Math functions.",
inputSchema: {
type: "object",
properties: {
expression: {
type: "string",
description: "Mathematical expression like '2 + 2' or 'Math.sqrt(16)'",
},
},
required: ["expression"],
},
},
],
};
});
// Handle tool calls
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
if (name === "calculate") {
const { expression } = CalculateSchema.parse(args);
try {
// Safe evaluation (in production, use a proper math parser)
const result = Function(`"use strict"; return (${expression})`)();
return {
content: [
{
type: "text",
text: JSON.stringify({ expression, result }),
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: JSON.stringify({ error: String(error) }),
},
],
isError: true,
};
}
}
throw new Error(`Unknown tool: ${name}`);
});
// List available resources
server.setRequestHandler(ListResourcesRequestSchema, async () => {
return {
resources: [
{
uri: "greeting://world",
name: "Default Greeting",
description: "A greeting for the world",
mimeType: "text/plain",
},
],
};
});
// Read resource content
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const { uri } = request.params;
if (uri.startsWith("greeting://")) {
const name = uri.replace("greeting://", "");
return {
contents: [
{
uri,
mimeType: "text/plain",
text: `Hello, ${name}! Welcome to the MCP server.`,
},
],
};
}
throw new Error(`Unknown resource: ${uri}`);
});
// Start the server
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Calculator MCP server running on stdio");
}
main().catch(console.error);
Understanding the TypeScript implementation:
-
Server configuration (lines 10-21): Unlike Python's single string, TypeScript requires explicit capability declaration. The
capabilities: { tools: {}, resources: {} }tells clients what this server supports. Empty objects mean "supported with default settings." -
Zod schemas (lines 24-26): We use Zod for runtime type validation. The
.describe()method adds documentation that becomes part of the tool's JSON schema—clients use this to understand parameter requirements. -
Handler pattern (lines 29-47):
setRequestHandler(ListToolsRequestSchema, ...)registers a handler for thetools/listmethod. The handler returns tool definitions including name, description, and input schema. This is called once when the client connects. -
Tool execution (lines 50-82):
CallToolRequestSchemahandles actual tool invocations. Note the explicit error handling withisError: true—this tells the client the tool failed so the model can retry or try a different approach. -
Safe evaluation (line 59):
Function()creates a function from a string. This is safer thaneval()because it doesn't access the surrounding scope, but it's still dangerous with untrusted input. In production, use a proper math parser library. -
Resource handlers (lines 85-116): Resources follow the same pattern—list available resources, then handle read requests. The URI scheme (
greeting://) is arbitrary; choose something meaningful for your domain.
Testing Your Server
Before integrating with a real MCP client like Claude Desktop, it's helpful to test your server directly. This test client demonstrates the JSON-RPC protocol that MCP uses under the hood. Understanding this protocol helps debug issues when things don't work as expected.
The test client spawns your server as a subprocess and communicates over stdin/stdout using newline-delimited JSON messages. Each message follows JSON-RPC 2.0 format with jsonrpc, method, params, and id fields.
# test_client.py
import asyncio
import json
async def test_server():
# Start the server process
proc = await asyncio.create_subprocess_exec(
"python", "server.py",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Send initialize request
init_request = {
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "1.0"}
},
"id": 1
}
proc.stdin.write(json.dumps(init_request).encode() + b"\n")
await proc.stdin.drain()
response = await proc.stdout.readline()
print("Init response:", json.loads(response))
# Call the calculate tool
tool_request = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "calculate",
"arguments": {"expression": "2 ** 10"}
},
"id": 2
}
proc.stdin.write(json.dumps(tool_request).encode() + b"\n")
await proc.stdin.drain()
response = await proc.stdout.readline()
print("Tool response:", json.loads(response))
proc.terminate()
asyncio.run(test_server())
How this test works:
-
Subprocess creation (lines 5-10): We spawn the server and capture its stdin/stdout. The server runs as an independent process, just like it would with Claude Desktop.
-
Initialize handshake (lines 12-19): Every MCP session starts with an
initializerequest. The client declares its protocol version and capabilities. The server responds with its own capabilities. This negotiation ensures compatibility. -
Tool invocation (lines 22-33): The
tools/callmethod executes a tool. Note the structure:nameidentifies the tool,argumentsis a JSON object matching the tool's input schema. The response containscontentwith the tool's output. -
Newline-delimited JSON: Each message ends with
\n. This is important—MCP servers read line-by-line, so missing newlines cause hangs.
Building Production Tools
The simple calculator example works for learning, but production tools need robustness. Real-world MCP tools face untrusted input from AI models (which can be manipulated by prompt injection), network failures, slow dependencies, and resource exhaustion. This section covers the patterns that make tools production-ready.
The key principles: validate everything, fail gracefully, timeout aggressively, and document thoroughly. AI models read your tool descriptions to decide when and how to use them—ambiguous documentation leads to misuse.
Structured Tool Definition
Production tools benefit from Pydantic models for input validation. This approach provides automatic type coercion, validation error messages that help the model correct its input, and self-documenting schemas. The Field() function adds constraints and descriptions that become part of the tool's JSON schema.
from mcp.server import Server
from mcp.types import Tool, TextContent
from pydantic import BaseModel, Field
from typing import Optional
import httpx
import asyncio
server = Server("production-tools")
# Define input schema with Pydantic
class WebSearchInput(BaseModel):
query: str = Field(..., description="Search query", min_length=1, max_length=500)
num_results: int = Field(default=5, ge=1, le=20, description="Number of results")
region: Optional[str] = Field(default=None, description="Region code (us, uk, etc.)")
class DatabaseQueryInput(BaseModel):
query: str = Field(..., description="SQL query to execute")
database: str = Field(..., description="Database name")
timeout_seconds: int = Field(default=30, ge=1, le=300)
# Tool with proper error handling and timeout
@server.tool()
async def web_search(
query: str,
num_results: int = 5,
region: Optional[str] = None
) -> list[TextContent]:
"""
Search the web for information.
Use this when you need:
- Current information (news, prices, events)
- Facts you're uncertain about
- Information that may have changed recently
Args:
query: The search query
num_results: Number of results to return (1-20)
region: Optional region code for localized results
Returns:
List of search results with title, URL, and snippet
"""
# Validate input
try:
validated = WebSearchInput(query=query, num_results=num_results, region=region)
except Exception as e:
return [TextContent(type="text", text=f'{{"error": "Invalid input: {e}"}}')]
# Execute with timeout
try:
async with asyncio.timeout(10):
results = await _execute_search(
validated.query,
validated.num_results,
validated.region
)
return [TextContent(type="text", text=json.dumps(results))]
except asyncio.TimeoutError:
return [TextContent(type="text", text='{"error": "Search timed out"}')]
except Exception as e:
return [TextContent(type="text", text=f'{{"error": "{str(e)}"}}')]
async def _execute_search(query: str, num_results: int, region: Optional[str]) -> dict:
"""Execute the actual search API call."""
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.search.example.com/search",
params={
"q": query,
"count": num_results,
"region": region or "us"
},
headers={"Authorization": f"Bearer {os.environ.get('SEARCH_API_KEY')}"}
)
response.raise_for_status()
return response.json()
Production patterns demonstrated:
-
Pydantic validation (lines 10-18): The
WebSearchInputmodel validates inputs before processing.min_length=1prevents empty queries,max_length=500prevents abuse, andge=1, le=20constrains result count. Failed validation returns a structured error message. -
Detailed docstrings (lines 27-41): The docstring is critical—it's what the AI model reads. Notice the "Use this when you need:" section that guides the model on appropriate usage. Be specific about capabilities and limitations.
-
Timeout wrapper (line 50):
asyncio.timeout(10)prevents runaway operations. External APIs can hang indefinitely; always wrap network calls with timeouts. Choose timeout values based on typical latency plus margin—10 seconds is aggressive for a search API. -
Structured error responses (lines 46, 57, 59): Errors return JSON with an
errorfield. This consistency helps the model understand what went wrong and potentially retry with corrected input. -
Separation of concerns (lines 62-76): The private
_execute_search()function handles the actual API call. This separation makes testing easier—you can mock_execute_searchwithout touching validation logic.
Database Query Tool with Safety
Database tools are among the most powerful—and most dangerous—MCP integrations. An AI model with database access can query massive datasets, but it can also accidentally (or through prompt injection) execute destructive queries. This section demonstrates defense-in-depth: regex-based query validation, read-only restrictions, connection timeouts, and result size limits.
The SQLSafetyChecker class implements a blocklist approach: we identify dangerous SQL patterns and reject queries that match. This isn't foolproof—determined attackers can bypass regex filters—but it catches accidental misuse and obvious attacks. For production systems, consider adding a SQL parser for structural validation or using a read-only database replica.
import asyncpg
from typing import Any
import re
class SQLSafetyChecker:
"""Validate SQL queries before execution."""
DANGEROUS_PATTERNS = [
r"\bDROP\s+(TABLE|DATABASE|INDEX)\b",
r"\bTRUNCATE\b",
r"\bDELETE\s+FROM\s+\w+\s*$", # DELETE without WHERE
r"\bUPDATE\s+\w+\s+SET\s+.*\s*$", # UPDATE without WHERE
r"\bALTER\s+TABLE\b",
r";\s*--", # SQL injection attempt
r"UNION\s+SELECT", # Potential injection
]
ALLOWED_OPERATIONS = ["SELECT", "WITH"] # Read-only by default
@classmethod
def validate(cls, query: str, allow_writes: bool = False) -> tuple[bool, str]:
"""Validate a SQL query for safety."""
query_upper = query.upper().strip()
# Check for dangerous patterns
for pattern in cls.DANGEROUS_PATTERNS:
if re.search(pattern, query_upper, re.IGNORECASE):
return False, f"Dangerous SQL pattern detected: {pattern}"
# Check operation type
if not allow_writes:
operation = query_upper.split()[0] if query_upper else ""
if operation not in cls.ALLOWED_OPERATIONS:
return False, f"Only SELECT queries allowed, got: {operation}"
return True, "OK"
@server.tool()
async def query_database(
query: str,
database: str,
timeout_seconds: int = 30
) -> list[TextContent]:
"""
Execute a read-only SQL query against the database.
Args:
query: SQL SELECT query to execute
database: Database name to query
timeout_seconds: Query timeout (max 300 seconds)
Returns:
Query results as JSON array of objects
"""
# Validate input
try:
validated = DatabaseQueryInput(
query=query,
database=database,
timeout_seconds=timeout_seconds
)
except Exception as e:
return [TextContent(type="text", text=f'{{"error": "Invalid input: {e}"}}')]
# Safety check
is_safe, message = SQLSafetyChecker.validate(validated.query)
if not is_safe:
return [TextContent(type="text", text=f'{{"error": "{message}"}}')]
# Execute query
try:
conn = await asyncpg.connect(
database=validated.database,
user=os.environ.get("DB_USER"),
password=os.environ.get("DB_PASSWORD"),
host=os.environ.get("DB_HOST", "localhost")
)
try:
async with asyncio.timeout(validated.timeout_seconds):
rows = await conn.fetch(validated.query)
results = [dict(row) for row in rows]
# Limit result size
if len(results) > 1000:
results = results[:1000]
results.append({"_truncated": True, "_total_rows": len(rows)})
return [TextContent(type="text", text=json.dumps(results, default=str))]
finally:
await conn.close()
except asyncio.TimeoutError:
return [TextContent(type="text", text='{"error": "Query timed out"}')]
except Exception as e:
return [TextContent(type="text", text=f'{{"error": "{str(e)}"}}')]
Security layers in this implementation:
-
Regex blocklist (lines 8-15): Each pattern targets a specific threat.
DROP TABLE/DATABASEprevents data loss.DELETE FROM table$(without WHERE) catches bulk deletes.UNION SELECTblocks a common SQL injection vector. The regex approach is fast but imperfect—always combine with other defenses. -
Operation allowlist (line 17): By default, only
SELECTandWITH(CTEs) are allowed. This prevents writes entirely. Theallow_writesparameter exists for admin tools that need write access after additional authorization. -
Validation function (lines 20-31): Returns a tuple of (success, message) for clear error handling. The message explains why validation failed, helping the model correct its query.
-
Connection management (lines 56-75): We create a fresh connection for each query and close it in a
finallyblock. For high-volume servers, use connection pooling (shown later in the Database Server example). -
Statement timeout (line 64):
asyncio.timeout(validated.timeout_seconds)limits execution time. Long-running queries can lock tables or consume resources. The default 30-second timeout balances between allowing complex queries and preventing abuse. -
Result truncation (lines 68-71): Large result sets consume memory and tokens when the model processes them. Truncating to 1000 rows with a
_truncatedflag tells the model to refine its query if more data is needed.
Limitations to be aware of:
- Regex can be bypassed with creative SQL (e.g., using comments, different casing not covered)
- This doesn't prevent reading sensitive data—add column-level permissions if needed
- Connection credentials in environment variables can leak through error messages
Working with Resources
Resources provide read access to data. Unlike tools (which execute actions), resources are for retrieving information. The distinction matters: tools can have side effects (send email, write files, modify databases), while resources are guaranteed read-only. This guarantee lets clients display resources safely and cache them aggressively.
MCP resources use URI schemes to identify data sources. A URI like file://config.json or db://users/123 tells the client what kind of data to expect. You define the URI scheme; there's no required format. Choose schemes that are intuitive for your domain.
Dynamic Resource Templates
Resource templates are powerful when you have parameterized data—like files in a directory, records in a database, or API responses. Instead of listing every possible resource upfront (impossible for large datasets), you provide URI templates with placeholders. The client fills in the placeholders when requesting specific resources.
This example demonstrates three patterns: listing known resources, defining templates for dynamic resources, and safely reading resource content with path traversal prevention.
from mcp.types import Resource, ResourceTemplate
from mcp.server import Server
import aiofiles
import os
server = Server("file-server")
# Static resource list
@server.list_resources()
async def list_resources() -> list[Resource]:
"""List available resources."""
resources = []
# List files in allowed directory
allowed_dir = os.environ.get("ALLOWED_DIR", "/home/user/documents")
for root, dirs, files in os.walk(allowed_dir):
for file in files:
path = os.path.join(root, file)
rel_path = os.path.relpath(path, allowed_dir)
resources.append(Resource(
uri=f"file://{rel_path}",
name=file,
description=f"File: {rel_path}",
mimeType=_get_mime_type(file)
))
return resources
# Resource templates for dynamic URIs
@server.list_resource_templates()
async def list_resource_templates() -> list[ResourceTemplate]:
"""List resource templates."""
return [
ResourceTemplate(
uriTemplate="file://{path}",
name="File Reader",
description="Read a file by path",
mimeType="application/octet-stream"
),
ResourceTemplate(
uriTemplate="api://users/{user_id}",
name="User Profile",
description="Get user profile by ID",
mimeType="application/json"
),
ResourceTemplate(
uriTemplate="db://{table}/{id}",
name="Database Record",
description="Get a database record by table and ID",
mimeType="application/json"
)
]
# Read resource content
@server.read_resource()
async def read_resource(uri: str) -> str:
"""Read resource content by URI."""
if uri.startswith("file://"):
return await _read_file(uri[7:])
elif uri.startswith("api://users/"):
user_id = uri.split("/")[-1]
return await _get_user(user_id)
elif uri.startswith("db://"):
parts = uri[5:].split("/")
return await _get_db_record(parts[0], parts[1])
else:
raise ValueError(f"Unknown resource URI scheme: {uri}")
async def _read_file(path: str) -> str:
"""Read a file with safety checks."""
# Prevent directory traversal
allowed_dir = os.environ.get("ALLOWED_DIR", "/home/user/documents")
full_path = os.path.normpath(os.path.join(allowed_dir, path))
if not full_path.startswith(allowed_dir):
raise ValueError("Access denied: path outside allowed directory")
if not os.path.exists(full_path):
raise FileNotFoundError(f"File not found: {path}")
async with aiofiles.open(full_path, "r") as f:
content = await f.read()
# Limit content size
if len(content) > 1_000_000: # 1MB limit
content = content[:1_000_000] + "\n... [truncated]"
return content
def _get_mime_type(filename: str) -> str:
"""Get MIME type from filename."""
ext = os.path.splitext(filename)[1].lower()
mime_types = {
".txt": "text/plain",
".md": "text/markdown",
".json": "application/json",
".py": "text/x-python",
".js": "text/javascript",
".ts": "text/typescript",
".html": "text/html",
".css": "text/css",
".yaml": "text/yaml",
".yml": "text/yaml",
}
return mime_types.get(ext, "application/octet-stream")
Understanding the resource patterns:
-
Static listing (lines 8-26):
@server.list_resources()returns resources that exist right now. We walk a directory and create aResourceobject for each file. This list can be expensive to generate for large directories—consider pagination or lazy loading. -
URI template (lines 29-50):
ResourceTemplatedefines patterns likefile://{path}where{path}is a placeholder. The template includes metadata (name, description, mimeType) that helps the model understand what kind of data it will get. Templates are cheaper than full listings because they don't enumerate all possibilities. -
Content reading (lines 53-68): The
@server.read_resource()handler parses URIs and routes to appropriate handlers. Note the scheme-based dispatch:file://,api://users/, anddb://each have different handlers. -
Path traversal prevention (lines 72-77): This is critical security. Without
os.path.normpath()and thestartswith()check, a malicious URI likefile://../../../etc/passwdcould read sensitive files. Always validate paths against an allowlist or allowed directory. -
Content truncation (lines 85-86): Large files can overwhelm the model's context window. Truncating with a clear
[truncated]marker tells the model the content is incomplete—it might need to request a specific section or use a different approach.
Subscribing to Resource Changes
For resources that change over time—log files, database records, live metrics—clients can subscribe to updates. When the resource changes, the server sends a notification, and the client can re-fetch the content. This is more efficient than polling because updates only happen when data actually changes.
The ResourceWatcher class below implements a simple polling-based change detection. For production, you'd want to use file system watchers (watchdog library), database triggers, or pub/sub systems for real-time notifications without polling overhead.
from mcp.types import ResourceUpdatedNotification
import asyncio
class ResourceWatcher:
"""Watch for resource changes and notify clients."""
def __init__(self, server: Server):
self.server = server
self.watched_resources: dict[str, float] = {} # uri -> last_modified
self._running = False
async def start(self):
"""Start watching resources."""
self._running = True
asyncio.create_task(self._watch_loop())
async def stop(self):
"""Stop watching resources."""
self._running = False
async def _watch_loop(self):
"""Check for resource changes periodically."""
while self._running:
for uri, last_modified in list(self.watched_resources.items()):
current_modified = await self._get_modified_time(uri)
if current_modified > last_modified:
# Resource changed - notify client
await self.server.send_notification(
ResourceUpdatedNotification(uri=uri)
)
self.watched_resources[uri] = current_modified
await asyncio.sleep(1) # Check every second
def watch(self, uri: str):
"""Start watching a resource."""
self.watched_resources[uri] = 0
def unwatch(self, uri: str):
"""Stop watching a resource."""
self.watched_resources.pop(uri, None)
async def _get_modified_time(self, uri: str) -> float:
"""Get resource modification time."""
if uri.startswith("file://"):
path = uri[7:]
allowed_dir = os.environ.get("ALLOWED_DIR", ".")
full_path = os.path.join(allowed_dir, path)
if os.path.exists(full_path):
return os.path.getmtime(full_path)
return 0
# Usage
watcher = ResourceWatcher(server)
@server.subscribe_resource()
async def subscribe_resource(uri: str):
"""Subscribe to resource updates."""
watcher.watch(uri)
@server.unsubscribe_resource()
async def unsubscribe_resource(uri: str):
"""Unsubscribe from resource updates."""
watcher.unwatch(uri)
How resource subscriptions work:
-
Watch loop (lines 17-28): A background task polls watched resources every second. When
mtimechanges, it sends a notification. This is simple but not scalable—1000 watched files means 1000 stat calls per second. Use inotify/FSEvents for file watching, or database triggers for DB resources. -
Notification sending (lines 24-26):
server.send_notification()pushes updates to the client. The client decides whether to re-fetch immediately or batch updates. Notifications are fire-and-forget—the server doesn't wait for acknowledgment. -
Subscription handlers (lines 49-57): These register client interest in specific URIs. Note that subscriptions are stateful—the server must track which clients are watching which resources. In multi-client scenarios, this requires careful memory management.
Prompts for Better Tool Usage
Prompts are the third pillar of MCP (alongside tools and resources). They're pre-defined templates that encode domain expertise and best practices. When a client requests a prompt, the server returns a structured message that the model uses as context for its next action.
Think of prompts as "expert guidance" injected into the conversation. A SQL-building prompt might include query optimization hints. An API request prompt might include authentication reminders. The model doesn't blindly follow prompts—it uses them as informed suggestions.
Prompts have arguments that customize their content. The sql-query-builder prompt below takes a table name and returns guidance tailored to that specific table. This makes prompts reusable across different contexts.
from mcp.types import Prompt, PromptMessage, PromptArgument
@server.list_prompts()
async def list_prompts() -> list[Prompt]:
"""List available prompts."""
return [
Prompt(
name="sql-query-builder",
description="Help construct safe SQL queries",
arguments=[
PromptArgument(
name="table",
description="The table to query",
required=True
),
PromptArgument(
name="columns",
description="Columns to select (comma-separated)",
required=False
),
PromptArgument(
name="conditions",
description="WHERE conditions in natural language",
required=False
)
]
),
Prompt(
name="api-request-template",
description="Template for making API requests",
arguments=[
PromptArgument(
name="endpoint",
description="API endpoint path",
required=True
),
PromptArgument(
name="method",
description="HTTP method (GET, POST, etc.)",
required=True
)
]
),
Prompt(
name="data-analysis-workflow",
description="Workflow for analyzing data",
arguments=[
PromptArgument(
name="data_source",
description="Where the data comes from",
required=True
),
PromptArgument(
name="analysis_goal",
description="What insights you're looking for",
required=True
)
]
)
]
@server.get_prompt()
async def get_prompt(name: str, arguments: dict[str, str] | None = None) -> list[PromptMessage]:
"""Get a prompt by name with arguments filled in."""
arguments = arguments or {}
if name == "sql-query-builder":
table = arguments.get("table", "TABLE_NAME")
columns = arguments.get("columns", "*")
conditions = arguments.get("conditions", "")
condition_clause = ""
if conditions:
condition_clause = f"\n\nThe user wants to filter by: {conditions}"
return [
PromptMessage(
role="user",
content=f"""Help me build a safe SQL query with these requirements:
Table: {table}
Columns to select: {columns}{condition_clause}
Guidelines:
1. Use parameterized queries when possible
2. Always include a LIMIT clause (max 1000 rows)
3. Avoid SELECT * in production queries
4. Use appropriate indexes by including indexed columns in WHERE
Provide the SQL query and explain any optimizations."""
)
]
elif name == "data-analysis-workflow":
data_source = arguments.get("data_source", "the database")
analysis_goal = arguments.get("analysis_goal", "general insights")
return [
PromptMessage(
role="user",
content=f"""I need to analyze data from {data_source} to find {analysis_goal}.
Please follow this workflow:
1. **Data Discovery**: First, query the available tables/columns to understand the schema
2. **Sample Data**: Get a small sample to understand data types and values
3. **Quality Check**: Check for nulls, outliers, and data quality issues
4. **Analysis**: Run queries to answer the analysis goal
5. **Summary**: Summarize findings with key metrics
Start with step 1 - what data is available?"""
)
]
elif name == "api-request-template":
endpoint = arguments.get("endpoint", "/api/endpoint")
method = arguments.get("method", "GET")
return [
PromptMessage(
role="user",
content=f"""Make an API request with these details:
Endpoint: {endpoint}
Method: {method}
Before making the request:
1. Check if authentication is required
2. Validate any required parameters
3. Set appropriate headers (Content-Type, Accept)
4. Handle potential errors gracefully
After the request:
1. Parse the response
2. Extract relevant data
3. Format for readability"""
)
]
raise ValueError(f"Unknown prompt: {name}")
Prompt design patterns:
-
Argument definition (lines 8-22): Each
PromptArgumenthas a name, description, and required flag. The description helps the model understand what values are appropriate. Required arguments must be provided; optional arguments use defaults. -
Context injection (lines 66-78): The
sql-query-builderprompt returns aPromptMessagewith roleuser. This becomes part of the conversation as if the user said it. The prompt includes guidelines ("use parameterized queries", "include LIMIT") that the model will follow when generating SQL. -
Multi-step workflows (lines 80-98): The
data-analysis-workflowprompt defines a complete methodology. By breaking analysis into numbered steps, the model is more likely to follow a systematic approach rather than jumping to conclusions. -
Dynamic content (line 69): Arguments are interpolated into the template.
{table}and{columns}are replaced with actual values. Validate arguments to prevent injection—if{table}contains malicious text, it becomes part of the prompt.
When to use prompts vs. tool docstrings:
- Tool docstrings: Short, focused guidance for a single tool
- Prompts: Complex workflows, multi-tool coordination, domain expertise that spans tools
Authentication and Security
MCP servers often handle sensitive operations—querying databases, sending messages, accessing files. Authentication ensures only authorized clients can use these capabilities. MCP itself doesn't mandate an authentication scheme; you implement what fits your security model.
This section covers two common patterns: API key authentication (simple, good for single-tenant scenarios) and OAuth2 (complex, required for multi-user access to third-party services like GitHub or Slack).
API Key Authentication
API keys are the simplest authentication mechanism. The client includes a key in request headers; the server validates it against a known list. This works well for internal tools or single-user deployments. For production, store keys securely (environment variables or secrets managers, never in code) and rotate them regularly.
The implementation below shows key loading, validation, and a decorator pattern that makes it easy to require authentication on specific tools.
import os
import hashlib
import hmac
from functools import wraps
class AuthenticationError(Exception):
"""Raised when authentication fails."""
pass
class MCPAuthenticator:
"""Handle MCP server authentication."""
def __init__(self):
self.api_keys = self._load_api_keys()
def _load_api_keys(self) -> dict[str, dict]:
"""Load API keys from environment or config."""
keys = {}
# Load from environment
if api_key := os.environ.get("MCP_API_KEY"):
keys[api_key] = {
"name": "default",
"permissions": ["tools", "resources", "prompts"]
}
# Load from config file
config_path = os.environ.get("MCP_API_KEYS_FILE")
if config_path and os.path.exists(config_path):
with open(config_path) as f:
for line in f:
if "=" in line:
key, perms = line.strip().split("=", 1)
keys[key] = {
"name": key[:8] + "...",
"permissions": perms.split(",")
}
return keys
def authenticate(self, request_headers: dict) -> dict:
"""Authenticate a request and return permissions."""
auth_header = request_headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise AuthenticationError("Missing or invalid Authorization header")
api_key = auth_header[7:] # Remove "Bearer " prefix
if api_key not in self.api_keys:
raise AuthenticationError("Invalid API key")
return self.api_keys[api_key]
def check_permission(self, permissions: list[str], required: str) -> bool:
"""Check if permissions include required permission."""
return required in permissions or "*" in permissions
# Middleware for authenticated tools
def require_auth(permission: str):
"""Decorator to require authentication for a tool."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Get auth context from request context
auth_context = kwargs.pop("_auth_context", None)
if not auth_context:
return [TextContent(
type="text",
text='{"error": "Authentication required"}'
)]
if not authenticator.check_permission(
auth_context.get("permissions", []),
permission
):
return [TextContent(
type="text",
text=f'{{"error": "Permission denied: {permission} required"}}'
)]
return await func(*args, **kwargs)
return wrapper
return decorator
authenticator = MCPAuthenticator()
@server.tool()
@require_auth("admin")
async def delete_record(table: str, record_id: str) -> list[TextContent]:
"""Delete a record (requires admin permission)."""
# Implementation...
pass
Understanding the authentication flow:
-
Key loading (lines 11-27): Keys come from environment variables or a config file. Each key has a name (for logging) and permissions list. The file format is simple:
key=permission1,permission2. For production, use a proper secrets manager. -
Authentication middleware (lines 29-42): The
authenticate()method extracts the Bearer token from the Authorization header and looks it up. Returns the key's permissions if valid; raisesAuthenticationErrorif not. -
Permission checking (lines 44-46): Permissions are strings like
"tools","resources", or"admin". The wildcard"*"grants all permissions. Check permissions at the operation level, not just at login. -
Decorator pattern (lines 49-74):
@require_auth("admin")wraps a tool and checks permissions before execution. This keeps authentication logic out of tool implementations. The_auth_contextis injected by middleware (not shown—this depends on how you integrate authentication with your server).
OAuth2 Integration
OAuth2 is required when your MCP server needs to access third-party services on behalf of users. For example, a GitHub integration needs the user's OAuth token to access their private repositories. The MCP server acts as an OAuth client, guiding users through the authorization flow and managing tokens.
The flow works like this: (1) Server generates an authorization URL, (2) User visits the URL and grants permissions, (3) OAuth provider redirects back with an authorization code, (4) Server exchanges the code for access tokens, (5) Server stores tokens and uses them for API calls. Token refresh happens automatically when tokens expire.
from dataclasses import dataclass
import httpx
import jwt
from datetime import datetime, timedelta
@dataclass
class OAuthConfig:
client_id: str
client_secret: str
authorize_url: str
token_url: str
scopes: list[str]
class OAuthHandler:
"""Handle OAuth2 authentication for MCP server."""
def __init__(self, config: OAuthConfig):
self.config = config
self.tokens: dict[str, dict] = {} # user_id -> token_info
def get_authorization_url(self, state: str, redirect_uri: str) -> str:
"""Generate OAuth authorization URL."""
params = {
"client_id": self.config.client_id,
"redirect_uri": redirect_uri,
"response_type": "code",
"scope": " ".join(self.config.scopes),
"state": state
}
query = "&".join(f"{k}={v}" for k, v in params.items())
return f"{self.config.authorize_url}?{query}"
async def exchange_code(self, code: str, redirect_uri: str) -> dict:
"""Exchange authorization code for tokens."""
async with httpx.AsyncClient() as client:
response = await client.post(
self.config.token_url,
data={
"grant_type": "authorization_code",
"client_id": self.config.client_id,
"client_secret": self.config.client_secret,
"code": code,
"redirect_uri": redirect_uri
}
)
response.raise_for_status()
return response.json()
async def refresh_token(self, refresh_token: str) -> dict:
"""Refresh an access token."""
async with httpx.AsyncClient() as client:
response = await client.post(
self.config.token_url,
data={
"grant_type": "refresh_token",
"client_id": self.config.client_id,
"client_secret": self.config.client_secret,
"refresh_token": refresh_token
}
)
response.raise_for_status()
return response.json()
def store_tokens(self, user_id: str, token_response: dict):
"""Store tokens for a user."""
self.tokens[user_id] = {
"access_token": token_response["access_token"],
"refresh_token": token_response.get("refresh_token"),
"expires_at": datetime.now() + timedelta(
seconds=token_response.get("expires_in", 3600)
)
}
async def get_access_token(self, user_id: str) -> str:
"""Get a valid access token for a user."""
if user_id not in self.tokens:
raise AuthenticationError("User not authenticated")
token_info = self.tokens[user_id]
# Refresh if expired
if datetime.now() >= token_info["expires_at"]:
if token_info.get("refresh_token"):
new_tokens = await self.refresh_token(token_info["refresh_token"])
self.store_tokens(user_id, new_tokens)
return new_tokens["access_token"]
else:
raise AuthenticationError("Token expired and no refresh token")
return token_info["access_token"]
# Example: GitHub OAuth integration
github_oauth = OAuthHandler(OAuthConfig(
client_id=os.environ.get("GITHUB_CLIENT_ID", ""),
client_secret=os.environ.get("GITHUB_CLIENT_SECRET", ""),
authorize_url="https://github.com/login/oauth/authorize",
token_url="https://github.com/login/oauth/access_token",
scopes=["repo", "user"]
))
@server.tool()
async def github_list_repos(user_id: str) -> list[TextContent]:
"""List GitHub repositories for authenticated user."""
try:
access_token = await github_oauth.get_access_token(user_id)
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.github.com/user/repos",
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/vnd.github.v3+json"
}
)
response.raise_for_status()
repos = response.json()
summary = [
{"name": r["full_name"], "url": r["html_url"], "stars": r["stargazers_count"]}
for r in repos[:20] # Limit to 20
]
return [TextContent(type="text", text=json.dumps(summary))]
except AuthenticationError as e:
return [TextContent(type="text", text=f'{{"error": "{str(e)}", "auth_required": true}}')]
OAuth2 implementation details:
-
Configuration (lines 6-12):
OAuthConfigholds provider-specific settings. Different providers (GitHub, Google, Slack) have different URLs but follow the same OAuth2 standard. Scopes determine what permissions you're requesting. -
Authorization URL (lines 18-26): The state parameter prevents CSRF attacks—generate a random string, include it in the URL, and verify it comes back unchanged. The redirect_uri must exactly match what you registered with the provider.
-
Token exchange (lines 28-42): The authorization code is single-use and expires quickly (typically 10 minutes). Exchange it immediately for access and refresh tokens. Store the refresh token securely—it's long-lived and can generate new access tokens.
-
Token refresh (lines 44-58): Access tokens expire (usually 1 hour). Before making API calls, check expiration and refresh if needed. The
get_access_token()method handles this transparently. -
Token storage (lines 60-68): This example uses in-memory storage—tokens are lost on restart. For production, store tokens encrypted in a database, keyed by user ID. Never log tokens.
-
Error handling (lines 100-101): The
auth_required: trueflag tells the client to initiate a new OAuth flow. The model can prompt the user to authenticate.
Streaming and Long-Running Operations
Some tools take minutes to complete—large file processing, complex queries, batch operations. Without feedback, users (and the model) don't know if the operation is progressing or stuck. MCP's progress notification system provides real-time updates during long-running operations.
The key concept is progressToken: the client includes this token when calling a tool that might take time. The server uses the token to send progress updates back to the client. Progress notifications include current progress, total (if known), and an optional message describing the current stage.
This pattern is essential for maintaining user trust during operations like "analyze all files in this directory" or "run these 50 database migrations." The client can display a progress bar, and the user knows exactly what's happening.
from mcp.types import ProgressToken, ProgressNotification
import asyncio
class StreamingToolHandler:
"""Handle streaming responses for long-running tools."""
def __init__(self, server: Server):
self.server = server
self.active_operations: dict[str, asyncio.Task] = {}
async def stream_progress(
self,
progress_token: ProgressToken,
total: int,
description: str
):
"""Send progress updates to client."""
for i in range(total):
await self.server.send_notification(
ProgressNotification(
progressToken=progress_token,
progress=i + 1,
total=total,
message=f"{description}: {i + 1}/{total}"
)
)
await asyncio.sleep(0) # Yield to event loop
@server.tool()
async def batch_process(
items: list[str],
operation: str,
_meta: dict | None = None # Contains progressToken
) -> list[TextContent]:
"""
Process a batch of items with progress reporting.
Args:
items: List of items to process
operation: Operation to perform on each item
"""
progress_token = _meta.get("progressToken") if _meta else None
results = []
for i, item in enumerate(items):
# Process item
result = await _process_single_item(item, operation)
results.append(result)
# Report progress
if progress_token:
await server.send_notification({
"method": "notifications/progress",
"params": {
"progressToken": progress_token,
"progress": i + 1,
"total": len(items)
}
})
return [TextContent(type="text", text=json.dumps(results))]
@server.tool()
async def long_running_analysis(
data_source: str,
analysis_type: str,
_meta: dict | None = None
) -> list[TextContent]:
"""
Perform long-running data analysis with streaming results.
This tool streams intermediate results as they become available.
"""
progress_token = _meta.get("progressToken") if _meta else None
stages = [
("Loading data", _load_data),
("Preprocessing", _preprocess),
("Running analysis", _analyze),
("Generating report", _generate_report)
]
context = {"data_source": data_source, "analysis_type": analysis_type}
for i, (stage_name, stage_func) in enumerate(stages):
# Report stage start
if progress_token:
await server.send_notification({
"method": "notifications/progress",
"params": {
"progressToken": progress_token,
"progress": i,
"total": len(stages),
"message": f"Stage: {stage_name}"
}
})
# Execute stage
try:
context = await stage_func(context)
except Exception as e:
return [TextContent(
type="text",
text=json.dumps({
"error": f"Failed at stage '{stage_name}': {str(e)}",
"completed_stages": i
})
)]
return [TextContent(type="text", text=json.dumps(context.get("report", {})))]
async def _load_data(context: dict) -> dict:
await asyncio.sleep(1) # Simulate work
context["data"] = {"loaded": True, "rows": 10000}
return context
async def _preprocess(context: dict) -> dict:
await asyncio.sleep(0.5)
context["preprocessed"] = True
return context
async def _analyze(context: dict) -> dict:
await asyncio.sleep(2)
context["analysis_results"] = {"insights": ["insight1", "insight2"]}
return context
async def _generate_report(context: dict) -> dict:
await asyncio.sleep(0.5)
context["report"] = {
"summary": "Analysis complete",
"data_rows": context["data"]["rows"],
"insights": context["analysis_results"]["insights"]
}
return context
Streaming patterns explained:
-
Progress token extraction (lines 33, 56): The
_metaparameter contains request metadata including theprogressToken. If the client didn't request progress updates, this isNone. Always check before sending notifications. -
Atomic updates (lines 40-48): Each progress notification includes
progress(current),total(expected), andmessage(human-readable status). Send updates at meaningful intervals—not every iteration, but enough to show movement. -
Batch processing (lines 30-52): For batch operations, report progress after each item. The model and user can see "Processing item 15 of 50" rather than wondering if the tool is frozen.
-
Stage-based progress (lines 54-99): Complex operations have distinct stages. Report when entering each stage with a descriptive message. If a stage fails, the error response includes
completed_stagesso the model knows what succeeded. -
Event loop yielding (line 23):
await asyncio.sleep(0)yields control to the event loop without delay. This ensures progress notifications are sent immediately rather than buffered.
When to use streaming:
- Operations taking >5 seconds
- Batch processing with predictable iteration count
- Multi-stage workflows where each stage is distinct
- Any operation where "stuck vs. working" is ambiguous
Error Handling Best Practices
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class ErrorCode(Enum):
INVALID_INPUT = "INVALID_INPUT"
AUTHENTICATION_FAILED = "AUTHENTICATION_FAILED"
PERMISSION_DENIED = "PERMISSION_DENIED"
RESOURCE_NOT_FOUND = "RESOURCE_NOT_FOUND"
RATE_LIMITED = "RATE_LIMITED"
TIMEOUT = "TIMEOUT"
INTERNAL_ERROR = "INTERNAL_ERROR"
SERVICE_UNAVAILABLE = "SERVICE_UNAVAILABLE"
@dataclass
class MCPError:
code: ErrorCode
message: str
details: Optional[dict] = None
retryable: bool = False
retry_after_seconds: Optional[int] = None
def to_response(self) -> list[TextContent]:
error_dict = {
"error": {
"code": self.code.value,
"message": self.message,
"retryable": self.retryable
}
}
if self.details:
error_dict["error"]["details"] = self.details
if self.retry_after_seconds:
error_dict["error"]["retry_after_seconds"] = self.retry_after_seconds
return [TextContent(type="text", text=json.dumps(error_dict))]
class ErrorHandler:
"""Centralized error handling for MCP server."""
@staticmethod
def handle_exception(e: Exception) -> MCPError:
"""Convert exception to MCPError."""
if isinstance(e, ValueError):
return MCPError(
code=ErrorCode.INVALID_INPUT,
message=str(e),
retryable=False
)
if isinstance(e, AuthenticationError):
return MCPError(
code=ErrorCode.AUTHENTICATION_FAILED,
message=str(e),
retryable=False
)
if isinstance(e, PermissionError):
return MCPError(
code=ErrorCode.PERMISSION_DENIED,
message=str(e),
retryable=False
)
if isinstance(e, FileNotFoundError):
return MCPError(
code=ErrorCode.RESOURCE_NOT_FOUND,
message=str(e),
retryable=False
)
if isinstance(e, asyncio.TimeoutError):
return MCPError(
code=ErrorCode.TIMEOUT,
message="Operation timed out",
retryable=True,
retry_after_seconds=5
)
if isinstance(e, httpx.HTTPStatusError):
if e.response.status_code == 429:
retry_after = int(e.response.headers.get("Retry-After", 60))
return MCPError(
code=ErrorCode.RATE_LIMITED,
message="Rate limit exceeded",
retryable=True,
retry_after_seconds=retry_after
)
elif e.response.status_code >= 500:
return MCPError(
code=ErrorCode.SERVICE_UNAVAILABLE,
message=f"Service returned {e.response.status_code}",
retryable=True,
retry_after_seconds=30
)
# Default to internal error
return MCPError(
code=ErrorCode.INTERNAL_ERROR,
message="An unexpected error occurred",
details={"exception_type": type(e).__name__},
retryable=False
)
def with_error_handling(func):
"""Decorator to add standardized error handling to tools."""
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
error = ErrorHandler.handle_exception(e)
return error.to_response()
return wrapper
# Usage
@server.tool()
@with_error_handling
async def risky_operation(param: str) -> list[TextContent]:
"""Operation that might fail."""
if not param:
raise ValueError("param is required")
# Do something that might fail
result = await some_external_call(param)
return [TextContent(type="text", text=json.dumps(result))]
Testing MCP Servers
Unit Testing Tools
import pytest
from unittest.mock import AsyncMock, patch
@pytest.fixture
def mock_http_client():
"""Mock HTTP client for testing."""
with patch("httpx.AsyncClient") as mock:
client = AsyncMock()
mock.return_value.__aenter__.return_value = client
yield client
@pytest.mark.asyncio
async def test_web_search_success(mock_http_client):
"""Test successful web search."""
mock_http_client.get.return_value.json.return_value = {
"results": [
{"title": "Test Result", "url": "https://example.com", "snippet": "Test snippet"}
]
}
mock_http_client.get.return_value.raise_for_status = AsyncMock()
result = await web_search("test query", num_results=1)
assert len(result) == 1
assert result[0].type == "text"
data = json.loads(result[0].text)
assert "results" in data
assert len(data["results"]) == 1
@pytest.mark.asyncio
async def test_web_search_timeout(mock_http_client):
"""Test web search timeout handling."""
mock_http_client.get.side_effect = asyncio.TimeoutError()
result = await web_search("test query")
data = json.loads(result[0].text)
assert "error" in data
assert "timeout" in data["error"].lower()
@pytest.mark.asyncio
async def test_calculate_valid_expression():
"""Test calculator with valid expression."""
result = await calculate("2 + 2")
data = json.loads(result[0].text)
assert data["result"] == 4
@pytest.mark.asyncio
async def test_calculate_invalid_expression():
"""Test calculator with invalid expression."""
result = await calculate("import os")
data = json.loads(result[0].text)
assert "error" in data
@pytest.mark.asyncio
async def test_sql_safety_checker():
"""Test SQL safety validation."""
checker = SQLSafetyChecker()
# Safe queries
assert checker.validate("SELECT * FROM users WHERE id = 1")[0] is True
assert checker.validate("SELECT name, email FROM users LIMIT 10")[0] is True
# Dangerous queries
assert checker.validate("DROP TABLE users")[0] is False
assert checker.validate("DELETE FROM users")[0] is False
assert checker.validate("SELECT * FROM users; -- DROP TABLE")[0] is False
Integration Testing
import subprocess
import json
class MCPTestClient:
"""Test client for MCP servers."""
def __init__(self, server_command: list[str]):
self.server_command = server_command
self.process = None
self.request_id = 0
async def start(self):
"""Start the server process."""
self.process = await asyncio.create_subprocess_exec(
*self.server_command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
async def stop(self):
"""Stop the server process."""
if self.process:
self.process.terminate()
await self.process.wait()
async def send_request(self, method: str, params: dict = None) -> dict:
"""Send a JSON-RPC request and get response."""
self.request_id += 1
request = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": self.request_id
}
self.process.stdin.write(json.dumps(request).encode() + b"\n")
await self.process.stdin.drain()
response_line = await self.process.stdout.readline()
return json.loads(response_line)
async def initialize(self) -> dict:
"""Initialize the MCP connection."""
return await self.send_request("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "1.0"}
})
async def list_tools(self) -> list[dict]:
"""List available tools."""
response = await self.send_request("tools/list")
return response.get("result", {}).get("tools", [])
async def call_tool(self, name: str, arguments: dict) -> dict:
"""Call a tool."""
response = await self.send_request("tools/call", {
"name": name,
"arguments": arguments
})
return response.get("result", {})
@pytest.fixture
async def mcp_client():
"""Create and start MCP test client."""
client = MCPTestClient(["python", "server.py"])
await client.start()
await client.initialize()
yield client
await client.stop()
@pytest.mark.asyncio
async def test_server_initialization(mcp_client):
"""Test server initializes correctly."""
tools = await mcp_client.list_tools()
assert len(tools) > 0
assert any(t["name"] == "calculate" for t in tools)
@pytest.mark.asyncio
async def test_tool_call_integration(mcp_client):
"""Test full tool call flow."""
result = await mcp_client.call_tool("calculate", {"expression": "10 * 5"})
content = result.get("content", [])
assert len(content) > 0
data = json.loads(content[0]["text"])
assert data["result"] == 50
Deploying MCP Servers
Docker Deployment
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy server code
COPY server.py .
COPY tools/ tools/
# Create non-root user
RUN useradd -m -u 1000 mcpuser
USER mcpuser
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import socket; s=socket.socket(); s.connect(('localhost', 8080)); s.close()"
# Run server
CMD ["python", "server.py"]
# docker-compose.yml
version: '3.8'
services:
mcp-server:
build: .
environment:
- MCP_API_KEY=${MCP_API_KEY}
- DATABASE_URL=${DATABASE_URL}
- SEARCH_API_KEY=${SEARCH_API_KEY}
volumes:
- ./data:/app/data:ro # Read-only data volume
ports:
- "8080:8080"
restart: unless-stopped
deploy:
resources:
limits:
memory: 512M
cpus: '0.5'
HTTP Transport for Remote Servers
from aiohttp import web
import json
class HTTPMCPServer:
"""MCP server with HTTP transport for remote deployment."""
def __init__(self, server: Server):
self.server = server
self.app = web.Application()
self.app.router.add_post("/mcp", self.handle_request)
self.app.router.add_get("/health", self.health_check)
async def handle_request(self, request: web.Request) -> web.Response:
"""Handle incoming MCP requests."""
try:
body = await request.json()
# Validate JSON-RPC format
if body.get("jsonrpc") != "2.0":
return web.json_response(
{"jsonrpc": "2.0", "error": {"code": -32600, "message": "Invalid request"}},
status=400
)
# Route to appropriate handler
method = body.get("method")
params = body.get("params", {})
request_id = body.get("id")
result = await self._dispatch(method, params)
return web.json_response({
"jsonrpc": "2.0",
"result": result,
"id": request_id
})
except json.JSONDecodeError:
return web.json_response(
{"jsonrpc": "2.0", "error": {"code": -32700, "message": "Parse error"}},
status=400
)
except Exception as e:
return web.json_response(
{"jsonrpc": "2.0", "error": {"code": -32603, "message": str(e)}},
status=500
)
async def _dispatch(self, method: str, params: dict) -> dict:
"""Dispatch request to appropriate handler."""
handlers = {
"initialize": self._handle_initialize,
"tools/list": self._handle_list_tools,
"tools/call": self._handle_call_tool,
"resources/list": self._handle_list_resources,
"resources/read": self._handle_read_resource,
"prompts/list": self._handle_list_prompts,
"prompts/get": self._handle_get_prompt,
}
handler = handlers.get(method)
if not handler:
raise ValueError(f"Unknown method: {method}")
return await handler(params)
async def health_check(self, request: web.Request) -> web.Response:
"""Health check endpoint."""
return web.json_response({"status": "healthy"})
def run(self, host: str = "0.0.0.0", port: int = 8080):
"""Run the HTTP server."""
web.run_app(self.app, host=host, port=port)
# Usage
if __name__ == "__main__":
http_server = HTTPMCPServer(server)
http_server.run(port=8080)
Configuring Claude Desktop
Add to ~/Library/Application Support/Claude/claude_desktop_config.json (macOS) or %APPDATA%\Claude\claude_desktop_config.json (Windows):
{
"mcpServers": {
"my-custom-server": {
"command": "python",
"args": ["/path/to/server.py"],
"env": {
"MCP_API_KEY": "your-api-key",
"DATABASE_URL": "postgresql://..."
}
},
"remote-server": {
"url": "https://mcp.example.com/mcp",
"headers": {
"Authorization": "Bearer your-token"
}
}
}
}
Real-World Integration Examples
GitHub Integration Server
A complete MCP server for GitHub operations:
from mcp.server import Server
from mcp.types import TextContent
import httpx
import os
from typing import Optional
server = Server("github-mcp")
class GitHubClient:
"""GitHub API client with rate limit handling."""
def __init__(self):
self.token = os.environ.get("GITHUB_TOKEN")
self.base_url = "https://api.github.com"
self._rate_limit_remaining = None
self._rate_limit_reset = None
async def request(self, method: str, endpoint: str, **kwargs) -> dict:
"""Make authenticated GitHub API request."""
headers = {
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github.v3+json",
"X-GitHub-Api-Version": "2022-11-28"
}
async with httpx.AsyncClient() as client:
response = await client.request(
method,
f"{self.base_url}{endpoint}",
headers=headers,
**kwargs
)
# Track rate limits
self._rate_limit_remaining = int(
response.headers.get("X-RateLimit-Remaining", 0)
)
self._rate_limit_reset = int(
response.headers.get("X-RateLimit-Reset", 0)
)
if response.status_code == 403 and self._rate_limit_remaining == 0:
raise Exception(f"Rate limited. Resets at {self._rate_limit_reset}")
response.raise_for_status()
return response.json()
github = GitHubClient()
@server.tool()
async def search_repositories(
query: str,
language: Optional[str] = None,
sort: str = "stars",
max_results: int = 10
) -> list[TextContent]:
"""
Search GitHub repositories.
Args:
query: Search query (e.g., "machine learning", "react framework")
language: Filter by programming language
sort: Sort by: stars, forks, updated, help-wanted-issues
max_results: Maximum results to return (1-100)
Returns:
List of matching repositories with details
"""
search_query = query
if language:
search_query += f" language:{language}"
result = await github.request(
"GET",
"/search/repositories",
params={
"q": search_query,
"sort": sort,
"per_page": min(max_results, 100)
}
)
repos = [
{
"name": r["full_name"],
"description": r["description"],
"stars": r["stargazers_count"],
"language": r["language"],
"url": r["html_url"],
"updated": r["updated_at"]
}
for r in result.get("items", [])
]
return [TextContent(type="text", text=json.dumps(repos, indent=2))]
@server.tool()
async def get_repository_info(owner: str, repo: str) -> list[TextContent]:
"""
Get detailed information about a repository.
Args:
owner: Repository owner (username or organization)
repo: Repository name
Returns:
Repository details including README, languages, and recent activity
"""
# Get repo info
repo_info = await github.request("GET", f"/repos/{owner}/{repo}")
# Get languages
languages = await github.request("GET", f"/repos/{owner}/{repo}/languages")
# Get recent commits
commits = await github.request(
"GET",
f"/repos/{owner}/{repo}/commits",
params={"per_page": 5}
)
result = {
"name": repo_info["full_name"],
"description": repo_info["description"],
"stars": repo_info["stargazers_count"],
"forks": repo_info["forks_count"],
"open_issues": repo_info["open_issues_count"],
"default_branch": repo_info["default_branch"],
"created_at": repo_info["created_at"],
"updated_at": repo_info["updated_at"],
"languages": languages,
"topics": repo_info.get("topics", []),
"recent_commits": [
{
"sha": c["sha"][:7],
"message": c["commit"]["message"].split("\n")[0],
"author": c["commit"]["author"]["name"],
"date": c["commit"]["author"]["date"]
}
for c in commits
]
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
@server.tool()
async def create_issue(
owner: str,
repo: str,
title: str,
body: str,
labels: list[str] = None
) -> list[TextContent]:
"""
Create a new issue in a repository.
Args:
owner: Repository owner
repo: Repository name
title: Issue title
body: Issue body (markdown supported)
labels: Optional list of label names
Returns:
Created issue details with URL
"""
payload = {
"title": title,
"body": body
}
if labels:
payload["labels"] = labels
result = await github.request(
"POST",
f"/repos/{owner}/{repo}/issues",
json=payload
)
return [TextContent(type="text", text=json.dumps({
"number": result["number"],
"url": result["html_url"],
"state": result["state"],
"created_at": result["created_at"]
}))]
@server.tool()
async def get_pull_request_diff(
owner: str,
repo: str,
pr_number: int
) -> list[TextContent]:
"""
Get the diff for a pull request.
Args:
owner: Repository owner
repo: Repository name
pr_number: Pull request number
Returns:
PR details and file changes
"""
# Get PR info
pr = await github.request("GET", f"/repos/{owner}/{repo}/pulls/{pr_number}")
# Get files changed
files = await github.request(
"GET",
f"/repos/{owner}/{repo}/pulls/{pr_number}/files"
)
result = {
"title": pr["title"],
"state": pr["state"],
"author": pr["user"]["login"],
"base": pr["base"]["ref"],
"head": pr["head"]["ref"],
"commits": pr["commits"],
"additions": pr["additions"],
"deletions": pr["deletions"],
"changed_files": pr["changed_files"],
"files": [
{
"filename": f["filename"],
"status": f["status"],
"additions": f["additions"],
"deletions": f["deletions"],
"patch": f.get("patch", "")[:1000] # Truncate large patches
}
for f in files[:20] # Limit files
]
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
Slack Integration Server
from mcp.server import Server
from mcp.types import TextContent
from slack_sdk.web.async_client import AsyncWebClient
from slack_sdk.errors import SlackApiError
import os
server = Server("slack-mcp")
slack = AsyncWebClient(token=os.environ.get("SLACK_BOT_TOKEN"))
@server.tool()
async def send_message(
channel: str,
text: str,
thread_ts: str = None
) -> list[TextContent]:
"""
Send a message to a Slack channel.
Args:
channel: Channel name (without #) or channel ID
text: Message text (supports Slack markdown)
thread_ts: Optional thread timestamp to reply in thread
Returns:
Message details including timestamp
"""
try:
# Resolve channel name to ID if needed
if not channel.startswith("C"):
channels = await slack.conversations_list()
channel_map = {c["name"]: c["id"] for c in channels["channels"]}
channel = channel_map.get(channel, channel)
result = await slack.chat_postMessage(
channel=channel,
text=text,
thread_ts=thread_ts
)
return [TextContent(type="text", text=json.dumps({
"ok": True,
"channel": result["channel"],
"ts": result["ts"],
"message": result["message"]["text"]
}))]
except SlackApiError as e:
return [TextContent(type="text", text=json.dumps({
"ok": False,
"error": e.response["error"]
}))]
@server.tool()
async def search_messages(
query: str,
channel: str = None,
count: int = 10
) -> list[TextContent]:
"""
Search Slack messages.
Args:
query: Search query
channel: Optional channel to search in
count: Number of results (max 100)
Returns:
Matching messages with context
"""
search_query = query
if channel:
search_query += f" in:{channel}"
try:
result = await slack.search_messages(
query=search_query,
count=min(count, 100)
)
messages = [
{
"text": m["text"],
"user": m.get("username", m.get("user")),
"channel": m["channel"]["name"],
"ts": m["ts"],
"permalink": m["permalink"]
}
for m in result["messages"]["matches"]
]
return [TextContent(type="text", text=json.dumps(messages, indent=2))]
except SlackApiError as e:
return [TextContent(type="text", text=json.dumps({
"error": e.response["error"]
}))]
@server.tool()
async def get_channel_history(
channel: str,
limit: int = 50
) -> list[TextContent]:
"""
Get recent messages from a channel.
Args:
channel: Channel name or ID
limit: Number of messages (max 200)
Returns:
Recent messages with user info
"""
try:
# Resolve channel name if needed
if not channel.startswith("C"):
channels = await slack.conversations_list()
channel_map = {c["name"]: c["id"] for c in channels["channels"]}
channel = channel_map.get(channel, channel)
result = await slack.conversations_history(
channel=channel,
limit=min(limit, 200)
)
# Get user info for messages
user_ids = set(m.get("user") for m in result["messages"] if m.get("user"))
users = {}
for user_id in user_ids:
try:
user_info = await slack.users_info(user=user_id)
users[user_id] = user_info["user"]["real_name"]
except:
users[user_id] = user_id
messages = [
{
"text": m["text"],
"user": users.get(m.get("user"), "Unknown"),
"ts": m["ts"],
"reactions": [
{"name": r["name"], "count": r["count"]}
for r in m.get("reactions", [])
]
}
for m in result["messages"]
]
return [TextContent(type="text", text=json.dumps(messages, indent=2))]
except SlackApiError as e:
return [TextContent(type="text", text=json.dumps({
"error": e.response["error"]
}))]
@server.tool()
async def create_channel(
name: str,
is_private: bool = False,
description: str = None
) -> list[TextContent]:
"""
Create a new Slack channel.
Args:
name: Channel name (lowercase, no spaces)
is_private: Whether channel should be private
description: Optional channel description
Returns:
Created channel details
"""
try:
result = await slack.conversations_create(
name=name.lower().replace(" ", "-"),
is_private=is_private
)
channel_id = result["channel"]["id"]
if description:
await slack.conversations_setTopic(
channel=channel_id,
topic=description
)
return [TextContent(type="text", text=json.dumps({
"ok": True,
"channel_id": channel_id,
"name": result["channel"]["name"]
}))]
except SlackApiError as e:
return [TextContent(type="text", text=json.dumps({
"ok": False,
"error": e.response["error"]
}))]
Database Server with Connection Pooling
from mcp.server import Server
from mcp.types import TextContent
import asyncpg
import os
from contextlib import asynccontextmanager
server = Server("database-mcp")
class DatabasePool:
"""Managed database connection pool."""
def __init__(self):
self.pool = None
self._databases = {}
async def initialize(self):
"""Initialize the default connection pool."""
self.pool = await asyncpg.create_pool(
host=os.environ.get("DB_HOST", "localhost"),
port=int(os.environ.get("DB_PORT", 5432)),
user=os.environ.get("DB_USER", "postgres"),
password=os.environ.get("DB_PASSWORD"),
database=os.environ.get("DB_NAME", "postgres"),
min_size=2,
max_size=10,
command_timeout=60
)
async def add_database(self, name: str, connection_string: str):
"""Add a named database connection."""
self._databases[name] = await asyncpg.create_pool(
connection_string,
min_size=1,
max_size=5
)
@asynccontextmanager
async def acquire(self, database: str = None):
"""Acquire a connection from the pool."""
pool = self._databases.get(database, self.pool)
async with pool.acquire() as conn:
yield conn
async def close(self):
"""Close all pools."""
if self.pool:
await self.pool.close()
for pool in self._databases.values():
await pool.close()
db = DatabasePool()
@server.tool()
async def list_tables(
schema: str = "public",
database: str = None
) -> list[TextContent]:
"""
List all tables in a schema.
Args:
schema: Schema name (default: public)
database: Optional named database connection
Returns:
List of tables with column counts
"""
query = """
SELECT
t.table_name,
COUNT(c.column_name) as column_count,
pg_size_pretty(pg_total_relation_size(quote_ident(t.table_name))) as size
FROM information_schema.tables t
LEFT JOIN information_schema.columns c
ON t.table_name = c.table_name
WHERE t.table_schema = $1
AND t.table_type = 'BASE TABLE'
GROUP BY t.table_name
ORDER BY t.table_name
"""
async with db.acquire(database) as conn:
rows = await conn.fetch(query, schema)
tables = [
{
"name": row["table_name"],
"columns": row["column_count"],
"size": row["size"]
}
for row in rows
]
return [TextContent(type="text", text=json.dumps(tables, indent=2))]
@server.tool()
async def describe_table(
table: str,
schema: str = "public",
database: str = None
) -> list[TextContent]:
"""
Get detailed schema information for a table.
Args:
table: Table name
schema: Schema name (default: public)
database: Optional named database connection
Returns:
Column definitions, constraints, and indexes
"""
columns_query = """
SELECT
column_name,
data_type,
is_nullable,
column_default,
character_maximum_length
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2
ORDER BY ordinal_position
"""
constraints_query = """
SELECT
tc.constraint_name,
tc.constraint_type,
kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
WHERE tc.table_schema = $1 AND tc.table_name = $2
"""
indexes_query = """
SELECT
indexname,
indexdef
FROM pg_indexes
WHERE schemaname = $1 AND tablename = $2
"""
async with db.acquire(database) as conn:
columns = await conn.fetch(columns_query, schema, table)
constraints = await conn.fetch(constraints_query, schema, table)
indexes = await conn.fetch(indexes_query, schema, table)
result = {
"table": f"{schema}.{table}",
"columns": [
{
"name": c["column_name"],
"type": c["data_type"],
"nullable": c["is_nullable"] == "YES",
"default": c["column_default"],
"max_length": c["character_maximum_length"]
}
for c in columns
],
"constraints": [
{
"name": c["constraint_name"],
"type": c["constraint_type"],
"column": c["column_name"]
}
for c in constraints
],
"indexes": [
{
"name": i["indexname"],
"definition": i["indexdef"]
}
for i in indexes
]
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
@server.tool()
async def execute_query(
query: str,
params: list = None,
database: str = None,
explain: bool = False
) -> list[TextContent]:
"""
Execute a read-only SQL query.
Args:
query: SQL query (SELECT only)
params: Optional query parameters for $1, $2, etc.
database: Optional named database connection
explain: If true, return query plan instead of results
Returns:
Query results as JSON array
"""
# Validate read-only
query_upper = query.upper().strip()
if not query_upper.startswith(("SELECT", "WITH", "EXPLAIN")):
return [TextContent(type="text", text=json.dumps({
"error": "Only SELECT queries are allowed"
}))]
if explain and not query_upper.startswith("EXPLAIN"):
query = f"EXPLAIN ANALYZE {query}"
params = params or []
async with db.acquire(database) as conn:
try:
# Set statement timeout
await conn.execute("SET statement_timeout = '30s'")
rows = await conn.fetch(query, *params)
# Convert to JSON-serializable format
results = [dict(row) for row in rows]
# Limit results
if len(results) > 1000:
results = results[:1000]
results.append({"_truncated": True, "_message": "Results limited to 1000 rows"})
return [TextContent(type="text", text=json.dumps(results, default=str, indent=2))]
except asyncpg.exceptions.QueryCanceledError:
return [TextContent(type="text", text=json.dumps({
"error": "Query timeout exceeded (30s)"
}))]
except Exception as e:
return [TextContent(type="text", text=json.dumps({
"error": str(e)
}))]
Debugging and Troubleshooting
MCP Inspector
Use the official MCP inspector for debugging:
# Install inspector
npm install -g @modelcontextprotocol/inspector
# Run your server through the inspector
npx mcp-inspector python server.py
The inspector provides:
- Real-time message logging
- Tool and resource testing
- Schema validation
- Performance metrics
Logging Best Practices
import logging
import json
import sys
from datetime import datetime
from functools import wraps
# Configure structured logging
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if hasattr(record, "tool_name"):
log_entry["tool"] = record.tool_name
if hasattr(record, "request_id"):
log_entry["request_id"] = record.request_id
if hasattr(record, "duration_ms"):
log_entry["duration_ms"] = record.duration_ms
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry)
# Setup logger
logger = logging.getLogger("mcp-server")
handler = logging.StreamHandler(sys.stderr) # MCP uses stdout for protocol
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Logging decorator for tools
def log_tool_call(func):
@wraps(func)
async def wrapper(*args, **kwargs):
import time
import uuid
request_id = str(uuid.uuid4())[:8]
tool_name = func.__name__
start = time.time()
logger.info(
f"Tool call started",
extra={
"tool_name": tool_name,
"request_id": request_id,
"args": str(kwargs)[:200]
}
)
try:
result = await func(*args, **kwargs)
duration = (time.time() - start) * 1000
logger.info(
f"Tool call completed",
extra={
"tool_name": tool_name,
"request_id": request_id,
"duration_ms": round(duration, 2)
}
)
return result
except Exception as e:
duration = (time.time() - start) * 1000
logger.error(
f"Tool call failed: {e}",
extra={
"tool_name": tool_name,
"request_id": request_id,
"duration_ms": round(duration, 2)
},
exc_info=True
)
raise
return wrapper
# Usage
@server.tool()
@log_tool_call
async def my_tool(param: str) -> list[TextContent]:
# Implementation
pass
Common Issues and Solutions
class MCPDiagnostics:
"""Diagnostic utilities for MCP servers."""
@staticmethod
async def check_connectivity():
"""Verify server can connect to dependencies."""
checks = []
# Database connectivity
try:
async with db.acquire() as conn:
await conn.fetchval("SELECT 1")
checks.append({"name": "database", "status": "ok"})
except Exception as e:
checks.append({"name": "database", "status": "error", "message": str(e)})
# External API connectivity
try:
async with httpx.AsyncClient() as client:
response = await client.get("https://api.github.com", timeout=5)
checks.append({"name": "github_api", "status": "ok"})
except Exception as e:
checks.append({"name": "github_api", "status": "error", "message": str(e)})
return checks
@staticmethod
def validate_environment():
"""Validate required environment variables."""
required = ["DB_HOST", "DB_USER", "GITHUB_TOKEN"]
missing = [var for var in required if not os.environ.get(var)]
if missing:
return {"valid": False, "missing": missing}
return {"valid": True}
@staticmethod
async def test_tool(tool_name: str, test_args: dict):
"""Test a specific tool with sample arguments."""
# Get the tool function
tool_func = getattr(server, tool_name, None)
if not tool_func:
return {"error": f"Tool {tool_name} not found"}
try:
result = await tool_func(**test_args)
return {
"success": True,
"result_type": type(result).__name__,
"result_length": len(result) if hasattr(result, "__len__") else None
}
except Exception as e:
return {"success": False, "error": str(e)}
# Add diagnostic endpoint
@server.tool()
async def _diagnostics() -> list[TextContent]:
"""Run server diagnostics (internal use)."""
results = {
"environment": MCPDiagnostics.validate_environment(),
"connectivity": await MCPDiagnostics.check_connectivity(),
"server_info": {
"name": server.name,
"tools": len(server._tools),
"resources": len(server._resources)
}
}
return [TextContent(type="text", text=json.dumps(results, indent=2))]
Protocol Debugging
import json
from datetime import datetime
class ProtocolDebugger:
"""Debug MCP protocol messages."""
def __init__(self, enabled: bool = False):
self.enabled = enabled
self.messages = []
def log_message(self, direction: str, message: dict):
"""Log a protocol message."""
if not self.enabled:
return
entry = {
"timestamp": datetime.utcnow().isoformat(),
"direction": direction, # "incoming" or "outgoing"
"method": message.get("method"),
"id": message.get("id"),
"has_result": "result" in message,
"has_error": "error" in message
}
self.messages.append(entry)
# Log to stderr
print(f"[MCP {direction}] {json.dumps(entry)}", file=sys.stderr)
def get_recent(self, count: int = 50) -> list[dict]:
"""Get recent messages."""
return self.messages[-count:]
def find_errors(self) -> list[dict]:
"""Find all error responses."""
return [m for m in self.messages if m.get("has_error")]
debugger = ProtocolDebugger(enabled=os.environ.get("MCP_DEBUG") == "1")
Performance Optimization
Response Caching
from functools import lru_cache
import hashlib
import time
from typing import Optional
class MCPCache:
"""Simple in-memory cache for MCP responses."""
def __init__(self, default_ttl: int = 300):
self.cache = {}
self.default_ttl = default_ttl
def _key(self, tool_name: str, args: dict) -> str:
"""Generate cache key."""
args_str = json.dumps(args, sort_keys=True)
return hashlib.md5(f"{tool_name}:{args_str}".encode()).hexdigest()
def get(self, tool_name: str, args: dict) -> Optional[any]:
"""Get cached result."""
key = self._key(tool_name, args)
if key in self.cache:
result, expiry = self.cache[key]
if time.time() < expiry:
return result
del self.cache[key]
return None
def set(self, tool_name: str, args: dict, result: any, ttl: int = None):
"""Cache a result."""
key = self._key(tool_name, args)
ttl = ttl or self.default_ttl
self.cache[key] = (result, time.time() + ttl)
def invalidate(self, tool_name: str = None):
"""Invalidate cache entries."""
if tool_name is None:
self.cache.clear()
else:
keys_to_remove = [
k for k in self.cache.keys()
if k.startswith(tool_name)
]
for k in keys_to_remove:
del self.cache[k]
cache = MCPCache()
def cached(ttl: int = 300):
"""Decorator to cache tool results."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Check cache
cached_result = cache.get(func.__name__, kwargs)
if cached_result is not None:
return cached_result
# Execute and cache
result = await func(*args, **kwargs)
cache.set(func.__name__, kwargs, result, ttl)
return result
return wrapper
return decorator
# Usage
@server.tool()
@cached(ttl=600) # Cache for 10 minutes
async def expensive_query(query: str) -> list[TextContent]:
# Expensive operation
pass
Request Batching
import asyncio
from collections import defaultdict
class RequestBatcher:
"""Batch multiple similar requests together."""
def __init__(self, batch_size: int = 10, batch_delay: float = 0.1):
self.batch_size = batch_size
self.batch_delay = batch_delay
self.pending = defaultdict(list)
self.locks = defaultdict(asyncio.Lock)
async def execute(self, batch_key: str, item: any, executor: callable):
"""Add item to batch and execute when ready."""
async with self.locks[batch_key]:
self.pending[batch_key].append(item)
# Wait a bit for more requests
await asyncio.sleep(self.batch_delay)
# Check if we should execute
if len(self.pending[batch_key]) >= self.batch_size:
return await self._execute_batch(batch_key, executor)
# If we're here, execute remaining items
if self.pending[batch_key]:
return await self._execute_batch(batch_key, executor)
async def _execute_batch(self, batch_key: str, executor: callable):
"""Execute a batch of requests."""
items = self.pending[batch_key]
self.pending[batch_key] = []
# Execute batch
results = await executor(items)
return results
batcher = RequestBatcher()
# Example: Batch GitHub API calls
async def batch_get_repos(repo_names: list[str]) -> dict:
"""Get multiple repos in optimized batches."""
async with httpx.AsyncClient() as client:
tasks = [
client.get(f"https://api.github.com/repos/{name}")
for name in repo_names
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
return {
name: resp.json() if not isinstance(resp, Exception) else {"error": str(resp)}
for name, resp in zip(repo_names, responses)
}
Connection Pool Tuning
class OptimizedDatabasePool:
"""Database pool with performance optimizations."""
def __init__(self):
self.pool = None
self.stats = {
"queries": 0,
"errors": 0,
"total_time_ms": 0
}
async def initialize(self):
"""Initialize with optimized settings."""
self.pool = await asyncpg.create_pool(
dsn=os.environ.get("DATABASE_URL"),
# Pool sizing
min_size=5, # Minimum connections
max_size=20, # Maximum connections
# Timeouts
command_timeout=30, # Query timeout
max_inactive_connection_lifetime=300, # Close idle connections
# Performance
statement_cache_size=100, # Cache prepared statements
# Setup
setup=self._setup_connection
)
async def _setup_connection(self, conn):
"""Configure each new connection."""
# Set optimal settings for read-heavy workloads
await conn.execute("SET work_mem = '256MB'")
await conn.execute("SET random_page_cost = 1.1") # For SSD
async def get_stats(self) -> dict:
"""Get pool statistics."""
return {
"pool_size": self.pool.get_size(),
"pool_free": self.pool.get_idle_size(),
"queries_executed": self.stats["queries"],
"errors": self.stats["errors"],
"avg_query_time_ms": (
self.stats["total_time_ms"] / self.stats["queries"]
if self.stats["queries"] > 0 else 0
)
}
Production Checklist
Before deploying an MCP server to production:
Security
- Input validation on all tool parameters
- SQL injection prevention for database queries
- Path traversal prevention for file operations
- Rate limiting per client
- Authentication required for sensitive operations
- Secrets stored in environment variables, not code
- Audit logging for all tool invocations
Reliability
- Timeouts on all external calls
- Retry logic with exponential backoff
- Circuit breakers for failing dependencies
- Graceful shutdown handling
- Health check endpoint
Observability
- Structured logging (JSON format)
- Request tracing with correlation IDs
- Metrics (request count, latency, errors)
- Alerting on error rates
Performance
- Connection pooling for databases
- Response caching where appropriate
- Pagination for large result sets
- Async operations for I/O-bound tasks
Conclusion
MCP servers provide a standardized way to extend AI model capabilities with custom tools and data sources. By following the patterns in this guide, you can build robust, secure, and maintainable integrations that work across any MCP-compatible client.
Start with simple tools, add complexity as needed, and always prioritize security—your MCP server will be executing code based on AI model decisions, so defense in depth is essential.
Frequently Asked Questions
Related Articles
The Rise of Agentic AI: Understanding MCP and A2A Protocols
An exploration of the emerging protocols enabling AI agents to communicate and collaborate, including Model Context Protocol (MCP) and Agent-to-Agent (A2A) communication.
Building Agentic AI Systems: A Complete Implementation Guide
A comprehensive guide to building AI agents—tool use, ReAct pattern, planning, memory, context management, MCP integration, and multi-agent orchestration. With full prompt examples and production patterns.
LLM Safety and Red Teaming: Attacks, Defenses, and Best Practices
A comprehensive guide to LLM security threats—prompt injection, jailbreaks, and adversarial attacks—plus the defense mechanisms and red teaming practices that protect production systems.