Model Context Protocol (MCP) went from "cool demo protocol" to production infrastructure in about six months. But here's the thing — most tutorials show you the happy path. Connect a server, call a tool, done.
Production is different. Production means auth failures at 3 AM, context windows exploding, tools timing out, and agents calling the wrong tool because your descriptions were ambiguous.
These are 9 patterns I've battle-tested for keeping MCP-based systems alive in production. Real code. Real problems. Real fixes.
Pattern 1: The Circuit Breaker for MCP Tool Calls
Your agent calls an MCP tool. The server is down. The agent retries. And retries. And retries. Meanwhile, your context window fills with error messages and your user stares at a spinner.
The fix: wrap every MCP tool call in a circuit breaker.
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable
import asyncio
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject calls
HALF_OPEN = "half_open" # Testing if service recovered
@dataclass
class CircuitBreaker:
failure_threshold: int = 3
recovery_timeout: float = 30.0
half_open_max_calls: int = 1
state: CircuitState = CircuitState.CLOSED
failure_count: int = 0
last_failure_time: float = 0.0
half_open_calls: int = 0
async def call(self, func: Callable, *args, **kwargs) -> Any:
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise CircuitOpenError(
f"Circuit open. Recovery in "
f"{self.recovery_timeout - (time.time() - self.last_failure_time):.1f}s"
)
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
raise CircuitOpenError("Circuit half-open, max test calls reached")
self.half_open_calls += 1
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class CircuitOpenError(Exception):
pass
# Usage with MCP tool calls
class ResilientMCPClient:
def __init__(self):
self.breakers: dict[str, CircuitBreaker] = {}
def _get_breaker(self, server_name: str) -> CircuitBreaker:
if server_name not in self.breakers:
self.breakers[server_name] = CircuitBreaker(
failure_threshold=3,
recovery_timeout=30.0,
)
return self.breakers[server_name]
async def call_tool(
self,
server_name: str,
tool_name: str,
arguments: dict,
mcp_session: Any,
) -> Any:
breaker = self._get_breaker(server_name)
async def _do_call():
return await mcp_session.call_tool(tool_name, arguments)
try:
return await breaker.call(_do_call)
except CircuitOpenError:
return {
"error": f"Server '{server_name}' is temporarily unavailable",
"tool": tool_name,
"fallback": True,
}
The key insight: one circuit breaker per MCP server, not per tool. If the server is down, all its tools are down.
Pattern 2: Context Window Budget Manager
MCP tools return data. Sometimes a LOT of data. A database query tool might return 50KB of JSON. A file reader might dump an entire codebase. Your context window has a budget — blow it and your agent starts hallucinating or loses earlier instructions.
interface ContextBudget {
maxTokens: number;
usedTokens: number;
reservedTokens: number; // Keep free for reasoning
toolResultBudgets: Map<string, number>;
}
class ContextBudgetManager {
private budget: ContextBudget;
constructor(maxContextTokens: number) {
this.budget = {
maxTokens: maxContextTokens,
usedTokens: 0,
reservedTokens: Math.floor(maxContextTokens * 0.3), // 30% for reasoning
toolResultBudgets: new Map([
["database_query", 4000],
["file_read", 3000],
["web_search", 2000],
["code_analysis", 5000],
]),
};
}
getAvailableTokens(): number {
return this.budget.maxTokens - this.budget.usedTokens - this.budget.reservedTokens;
}
truncateToolResult(toolName: string, result: string): string {
const maxTokens =
this.budget.toolResultBudgets.get(toolName) ??
Math.min(2000, this.getAvailableTokens());
// Rough estimate: 1 token ≈ 4 chars
const maxChars = maxTokens * 4;
if (result.length <= maxChars) {
this.budget.usedTokens += Math.ceil(result.length / 4);
return result;
}
const truncated = this.smartTruncate(result, maxChars);
this.budget.usedTokens += maxTokens;
return truncated;
}
private smartTruncate(text: string, maxChars: number): string {
// For JSON, try to keep structure intact
if (text.trimStart().startsWith("{") || text.trimStart().startsWith("[")) {
return this.truncateJSON(text, maxChars);
}
// For plain text, keep beginning and end
const headSize = Math.floor(maxChars * 0.7);
const tailSize = maxChars - headSize - 50; // 50 chars for separator
const head = text.slice(0, headSize);
const tail = text.slice(-tailSize);
return `${head}\n\n... [TRUNCATED: ${text.length - maxChars} chars removed] ...\n\n${tail}`;
}
private truncateJSON(text: string, maxChars: number): string {
try {
const parsed = JSON.parse(text);
if (Array.isArray(parsed)) {
// Keep first and last items, report total count
const totalItems = parsed.length;
const keepItems = Math.min(5, totalItems);
const sample = parsed.slice(0, keepItems);
const result = JSON.stringify(
{
_truncated: true,
_totalItems: totalItems,
_showingFirst: keepItems,
items: sample,
},
null,
2
);
if (result.length <= maxChars) return result;
}
// Fallback: stringify with no indent, then truncate
const compact = JSON.stringify(parsed);
if (compact.length <= maxChars) return compact;
return compact.slice(0, maxChars) + "... [TRUNCATED]";
} catch {
return text.slice(0, maxChars) + "... [TRUNCATED]";
}
}
reportUsage(): { used: number; available: number; percentage: number } {
const available = this.getAvailableTokens();
return {
used: this.budget.usedTokens,
available,
percentage: Math.round(
(this.budget.usedTokens / (this.budget.maxTokens - this.budget.reservedTokens)) * 100
),
};
}
}
// Integration with MCP tool results
async function processToolResult(
budgetManager: ContextBudgetManager,
toolName: string,
rawResult: string
): Promise<string> {
const usage = budgetManager.reportUsage();
if (usage.percentage > 80) {
console.warn(
`⚠️ Context budget at ${usage.percentage}%. ` +
`Aggressively truncating ${toolName} result.`
);
}
return budgetManager.truncateToolResult(toolName, rawResult);
}
The 30% reservation for reasoning is critical. Without it, your agent has all the data but no room to think about it.
Pattern 3: Tool Description Versioning
Your MCP server evolves. Tool descriptions change. But your agent's behavior depends on those descriptions — they're part of the prompt. Change a description and your agent might start calling tools differently.
import hashlib
import json
from dataclasses import dataclass
from datetime import datetime, timezone
@dataclass
class ToolVersion:
name: str
description: str
input_schema: dict
version_hash: str
registered_at: datetime
class ToolRegistry:
def __init__(self):
self._tools: dict[str, list[ToolVersion]] = {}
self._active: dict[str, ToolVersion] = {}
def _compute_hash(self, description: str, schema: dict) -> str:
content = f"{description}|{json.dumps(schema, sort_keys=True)}"
return hashlib.sha256(content.encode()).hexdigest()[:12]
def register_tool(
self,
name: str,
description: str,
input_schema: dict,
) -> ToolVersion:
version_hash = self._compute_hash(description, input_schema)
# Check if this exact version already exists
if name in self._tools:
for existing in self._tools[name]:
if existing.version_hash == version_hash:
return existing
version = ToolVersion(
name=name,
description=description,
input_schema=input_schema,
version_hash=version_hash,
registered_at=datetime.now(timezone.utc),
)
self._tools.setdefault(name, []).append(version)
self._active[name] = version
return version
def detect_drift(self, server_tools: list[dict]) -> list[dict]:
"""Compare current server tools against registered versions."""
changes = []
for tool in server_tools:
name = tool["name"]
if name not in self._active:
changes.append({
"type": "new_tool",
"tool": name,
"action": "register",
})
continue
current_hash = self._compute_hash(
tool["description"],
tool.get("inputSchema", {}),
)
active = self._active[name]
if current_hash != active.version_hash:
changes.append({
"type": "tool_changed",
"tool": name,
"old_hash": active.version_hash,
"new_hash": current_hash,
"old_description": active.description,
"new_description": tool["description"],
"action": "review_and_update",
})
# Check for removed tools
server_tool_names = {t["name"] for t in server_tools}
for name in self._active:
if name not in server_tool_names:
changes.append({
"type": "tool_removed",
"tool": name,
"action": "deregister",
})
return changes
def get_history(self, tool_name: str) -> list[ToolVersion]:
return self._tools.get(tool_name, [])
# Usage: on every MCP server reconnection
async def on_server_reconnect(
registry: ToolRegistry,
server_tools: list[dict],
alert_callback=None,
):
drift = registry.detect_drift(server_tools)
if drift:
print(f"⚠️ Tool drift detected: {len(drift)} changes")
for change in drift:
print(f" - {change['type']}: {change['tool']}")
if change["type"] == "tool_changed":
# Log the before/after for debugging
print(f" Old: {change['old_description'][:80]}...")
print(f" New: {change['new_description'][:80]}...")
if alert_callback:
await alert_callback(drift)
# Register all current tools
for tool in server_tools:
registry.register_tool(
name=tool["name"],
description=tool["description"],
input_schema=tool.get("inputSchema", {}),
)
This catches those sneaky bugs where someone updates a tool description and suddenly your agent sends parameters in a different format.
Pattern 4: The Fallback Chain
Not every tool call needs to succeed. Sometimes you have alternatives. A database query fails? Try the cache. Cache miss? Use a default. This pattern chains MCP tools with automatic fallback.
type ToolCall = {
server: string;
tool: string;
arguments: Record<string, unknown>;
};
type FallbackResult = {
result: unknown;
source: string;
fallbackUsed: boolean;
attempts: Array<{ server: string; tool: string; error?: string }>;
};
class FallbackChain {
private chains: Map<string, ToolCall[]> = new Map();
register(intentName: string, tools: ToolCall[]): void {
this.chains.set(intentName, tools);
}
async execute(
intentName: string,
mcpClients: Map<string, any>,
overrideArgs?: Record<string, unknown>
): Promise<FallbackResult> {
const chain = this.chains.get(intentName);
if (!chain || chain.length === 0) {
throw new Error(`No fallback chain registered for "${intentName}"`);
}
const attempts: FallbackResult["attempts"] = [];
for (const toolCall of chain) {
const client = mcpClients.get(toolCall.server);
if (!client) {
attempts.push({
server: toolCall.server,
tool: toolCall.tool,
error: "Server not connected",
});
continue;
}
try {
const args = { ...toolCall.arguments, ...overrideArgs };
const result = await client.callTool(toolCall.tool, args);
// Validate result isn't empty/null
if (result === null || result === undefined) {
attempts.push({
server: toolCall.server,
tool: toolCall.tool,
error: "Empty result",
});
continue;
}
return {
result,
source: `${toolCall.server}/${toolCall.tool}`,
fallbackUsed: attempts.length > 0,
attempts,
};
} catch (error) {
attempts.push({
server: toolCall.server,
tool: toolCall.tool,
error: error instanceof Error ? error.message : String(error),
});
}
}
throw new FallbackExhaustedError(intentName, attempts);
}
}
class FallbackExhaustedError extends Error {
constructor(
public intent: string,
public attempts: FallbackResult["attempts"]
) {
super(
`All fallbacks exhausted for "${intent}". ` +
`Tried: ${attempts.map((a) => `${a.server}/${a.tool}`).join(" → ")}`
);
}
}
// Setup example
const fallbacks = new FallbackChain();
fallbacks.register("get_user_data", [
{
server: "postgres-mcp",
tool: "query",
arguments: { sql: "SELECT * FROM users WHERE id = $1" },
},
{
server: "redis-mcp",
tool: "get",
arguments: { key: "user:{id}" },
},
{
server: "api-mcp",
tool: "http_get",
arguments: { url: "/api/users/{id}" },
},
]);
// Usage
const userData = await fallbacks.execute("get_user_data", mcpClients, {
id: "user-123",
});
console.log(`Got data from: ${userData.source}`);
if (userData.fallbackUsed) {
console.warn(`Primary source failed. Used fallback after ${userData.attempts.length} attempts.`);
}
The attempts log is gold for debugging. You'll always know why a particular source was chosen.
Pattern 5: Request Deduplication Layer
Agents are chatty. They'll call the same tool with the same arguments multiple times in a single conversation. A dedup layer saves tokens, time, and API costs.
import hashlib
import json
import time
from dataclasses import dataclass, field
from typing import Any, Optional
@dataclass
class CachedResult:
result: Any
timestamp: float
call_count: int = 1
class MCPDeduplicator:
def __init__(self, ttl_seconds: float = 300.0):
self.ttl = ttl_seconds
self._cache: dict[str, CachedResult] = {}
self._stats = {"hits": 0, "misses": 0, "evictions": 0}
def _make_key(self, server: str, tool: str, arguments: dict) -> str:
"""Deterministic cache key from call parameters."""
normalized = json.dumps(
{"server": server, "tool": tool, "args": arguments},
sort_keys=True,
default=str,
)
return hashlib.sha256(normalized.encode()).hexdigest()
def _is_cacheable(self, tool: str) -> bool:
"""Some tools should never be cached (side effects)."""
non_cacheable = {
"send_email",
"create_record",
"update_record",
"delete_record",
"execute_command",
"write_file",
"post_message",
}
return tool not in non_cacheable
async def call_tool(
self,
server: str,
tool: str,
arguments: dict,
mcp_session: Any,
force_refresh: bool = False,
) -> tuple[Any, bool]:
"""Returns (result, was_cached)."""
if not self._is_cacheable(tool) or force_refresh:
result = await mcp_session.call_tool(tool, arguments)
self._stats["misses"] += 1
return result, False
key = self._make_key(server, tool, arguments)
# Check cache
if key in self._cache:
cached = self._cache[key]
age = time.time() - cached.timestamp
if age < self.ttl:
cached.call_count += 1
self._stats["hits"] += 1
return cached.result, True
else:
del self._cache[key]
self._stats["evictions"] += 1
# Cache miss — make the actual call
result = await mcp_session.call_tool(tool, arguments)
self._cache[key] = CachedResult(
result=result,
timestamp=time.time(),
)
self._stats["misses"] += 1
return result, False
def get_stats(self) -> dict:
total = self._stats["hits"] + self._stats["misses"]
hit_rate = (self._stats["hits"] / total * 100) if total > 0 else 0
return {
**self._stats,
"total_calls": total,
"hit_rate_pct": round(hit_rate, 1),
"cache_size": len(self._cache),
}
def invalidate(self, server: str, tool: str, arguments: dict) -> bool:
"""Manually invalidate a specific cached result."""
key = self._make_key(server, tool, arguments)
if key in self._cache:
del self._cache[key]
return True
return False
def clear(self) -> int:
"""Clear entire cache. Returns number of evicted entries."""
count = len(self._cache)
self._cache.clear()
return count
The _is_cacheable check is essential. You don't want to cache write operations — "send that email" should actually send it every time.
Pattern 6: Structured Error Taxonomy
When an MCP tool fails, your agent needs to understand why — not just "error occurred". Different errors need different responses: retry, fallback, ask the user, or give up.
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Any
class ErrorCategory(Enum):
TRANSIENT = "transient" # Retry might work
AUTH = "auth" # Credentials issue
VALIDATION = "validation" # Bad input from agent
NOT_FOUND = "not_found" # Resource doesn't exist
RATE_LIMITED = "rate_limited" # Too many calls
PERMISSION = "permission" # Allowed but can't do this
SERVER_ERROR = "server_error" # MCP server broken
TIMEOUT = "timeout" # Took too long
@dataclass
class ClassifiedError:
category: ErrorCategory
message: str
original_error: Optional[Exception] = None
retry_after_seconds: Optional[float] = None
suggestion: Optional[str] = None
class MCPErrorClassifier:
"""Classify raw MCP errors into actionable categories."""
TRANSIENT_PATTERNS = [
"connection reset",
"temporary failure",
"service unavailable",
"ECONNRESET",
"ETIMEDOUT",
]
AUTH_PATTERNS = [
"unauthorized",
"invalid token",
"expired token",
"authentication failed",
"401",
]
RATE_LIMIT_PATTERNS = [
"rate limit",
"too many requests",
"429",
"quota exceeded",
]
def classify(self, error: Exception, tool_name: str = "") -> ClassifiedError:
msg = str(error).lower()
# Check patterns in priority order
if any(p in msg for p in self.RATE_LIMIT_PATTERNS):
retry_after = self._extract_retry_after(str(error))
return ClassifiedError(
category=ErrorCategory.RATE_LIMITED,
message=f"Rate limited on {tool_name}",
original_error=error,
retry_after_seconds=retry_after or 60.0,
suggestion="Wait before retrying. Consider reducing call frequency.",
)
if any(p in msg for p in self.AUTH_PATTERNS):
return ClassifiedError(
category=ErrorCategory.AUTH,
message=f"Authentication failed for {tool_name}",
original_error=error,
suggestion="Check API keys and token expiry. Do not retry.",
)
if any(p in msg for p in self.TRANSIENT_PATTERNS):
return ClassifiedError(
category=ErrorCategory.TRANSIENT,
message=f"Transient error on {tool_name}",
original_error=error,
retry_after_seconds=5.0,
suggestion="Retry with exponential backoff.",
)
if "timeout" in msg or isinstance(error, TimeoutError):
return ClassifiedError(
category=ErrorCategory.TIMEOUT,
message=f"Timeout calling {tool_name}",
original_error=error,
retry_after_seconds=10.0,
suggestion="Retry with longer timeout or simpler query.",
)
if "not found" in msg or "404" in msg:
return ClassifiedError(
category=ErrorCategory.NOT_FOUND,
message=f"Resource not found for {tool_name}",
original_error=error,
suggestion="Verify resource ID/path. Do not retry with same args.",
)
if "validation" in msg or "invalid" in msg or "schema" in msg:
return ClassifiedError(
category=ErrorCategory.VALIDATION,
message=f"Invalid input for {tool_name}",
original_error=error,
suggestion="Fix input parameters before retrying.",
)
# Default: server error
return ClassifiedError(
category=ErrorCategory.SERVER_ERROR,
message=f"Server error on {tool_name}: {str(error)[:200]}",
original_error=error,
suggestion="Check MCP server logs. May need manual intervention.",
)
def _extract_retry_after(self, error_msg: str) -> Optional[float]:
"""Try to extract retry-after value from error message."""
import re
match = re.search(r"retry.after[:\s]+(\d+)", error_msg, re.IGNORECASE)
if match:
return float(match.group(1))
return None
def should_retry(self, classified: ClassifiedError) -> bool:
retryable = {
ErrorCategory.TRANSIENT,
ErrorCategory.RATE_LIMITED,
ErrorCategory.TIMEOUT,
}
return classified.category in retryable
def format_for_agent(self, classified: ClassifiedError) -> str:
"""Format error for inclusion in agent context."""
parts = [
f"[{classified.category.value.upper()}] {classified.message}",
]
if classified.suggestion:
parts.append(f"Suggestion: {classified.suggestion}")
if classified.retry_after_seconds and self.should_retry(classified):
parts.append(f"Retry after: {classified.retry_after_seconds}s")
return " | ".join(parts)
The format_for_agent method is key — it gives the LLM structured info to decide what to do next instead of raw stack traces.
Pattern 7: Multi-Server Tool Router
When you have 5+ MCP servers, your agent gets overwhelmed with tool choices. A router layer maps intents to the right server/tool combination.
interface ToolRoute {
server: string;
tool: string;
priority: number;
condition?: (args: Record<string, unknown>) => boolean;
}
interface RouteConfig {
intent: string;
description: string;
routes: ToolRoute[];
defaultArgs?: Record<string, unknown>;
}
class MCPToolRouter {
private routes: Map<string, RouteConfig> = new Map();
private serverHealth: Map<string, boolean> = new Map();
register(config: RouteConfig): void {
this.routes.set(config.intent, config);
}
updateServerHealth(server: string, healthy: boolean): void {
this.serverHealth.set(server, healthy);
}
resolve(
intent: string,
args: Record<string, unknown> = {}
): { server: string; tool: string; args: Record<string, unknown> } | null {
const config = this.routes.get(intent);
if (!config) return null;
// Sort by priority, filter by health and conditions
const candidates = config.routes
.filter((r) => this.serverHealth.get(r.server) !== false)
.filter((r) => !r.condition || r.condition(args))
.sort((a, b) => a.priority - b.priority);
if (candidates.length === 0) return null;
const best = candidates[0];
return {
server: best.server,
tool: best.tool,
args: { ...config.defaultArgs, ...args },
};
}
// Generate a simplified tool list for the agent's system prompt
generateToolManifest(): string {
const lines: string[] = ["# Available Actions\n"];
for (const [intent, config] of this.routes) {
const healthyServers = config.routes.filter(
(r) => this.serverHealth.get(r.server) !== false
);
const status = healthyServers.length > 0 ? "✅" : "❌";
lines.push(`${status} **${intent}**: ${config.description}`);
}
return lines.join("\n");
}
}
// Setup
const router = new MCPToolRouter();
router.register({
intent: "search_documents",
description: "Search across all document stores",
routes: [
{
server: "elasticsearch-mcp",
tool: "search",
priority: 1,
condition: (args) => typeof args.query === "string" && args.query.length > 0,
},
{
server: "postgres-mcp",
tool: "full_text_search",
priority: 2,
},
],
});
router.register({
intent: "read_file",
description: "Read a file from any connected filesystem",
routes: [
{
server: "local-fs-mcp",
tool: "read_file",
priority: 1,
condition: (args) =>
typeof args.path === "string" && !args.path.startsWith("s3://"),
},
{
server: "s3-mcp",
tool: "get_object",
priority: 1,
condition: (args) =>
typeof args.path === "string" && args.path.startsWith("s3://"),
},
{
server: "github-mcp",
tool: "get_file_contents",
priority: 2,
},
],
});
// In agent loop
const resolved = router.resolve("search_documents", {
query: "quarterly revenue report",
});
if (resolved) {
const result = await mcpClients
.get(resolved.server)!
.callTool(resolved.tool, resolved.args);
}
This keeps your agent's prompt clean. Instead of 40 tools from 8 servers, the agent sees 10 clear intents.
Pattern 8: Observability Pipeline for Tool Calls
You can't improve what you can't measure. Every MCP tool call should emit structured telemetry.
import time
import json
import logging
from dataclasses import dataclass, asdict
from typing import Any, Optional
from contextlib import asynccontextmanager
from datetime import datetime, timezone
import uuid
@dataclass
class ToolCallEvent:
event_id: str
timestamp: str
server: str
tool: str
arguments_hash: str
duration_ms: float
success: bool
error_category: Optional[str] = None
result_size_bytes: Optional[int] = None
cached: bool = False
fallback_used: bool = False
context_tokens_used: Optional[int] = None
class MCPObserver:
def __init__(self, logger: Optional[logging.Logger] = None):
self.logger = logger or logging.getLogger("mcp.observer")
self._events: list[ToolCallEvent] = []
@asynccontextmanager
async def observe(
self,
server: str,
tool: str,
arguments: dict,
):
event_id = str(uuid.uuid4())[:8]
start = time.perf_counter()
# Hash arguments (don't log sensitive data)
import hashlib
args_hash = hashlib.sha256(
json.dumps(arguments, sort_keys=True, default=str).encode()
).hexdigest()[:12]
event = ToolCallEvent(
event_id=event_id,
timestamp=datetime.now(timezone.utc).isoformat(),
server=server,
tool=tool,
arguments_hash=args_hash,
duration_ms=0,
success=False,
)
try:
yield event
event.success = True
except Exception as e:
event.error_category = type(e).__name__
raise
finally:
event.duration_ms = round((time.perf_counter() - start) * 1000, 2)
self._events.append(event)
# Structured log
self.logger.info(
"mcp_tool_call",
extra={"event": asdict(event)},
)
def get_summary(self, last_n: int = 100) -> dict:
recent = self._events[-last_n:]
if not recent:
return {"total_calls": 0}
successful = [e for e in recent if e.success]
failed = [e for e in recent if not e.success]
durations = [e.duration_ms for e in successful]
avg_duration = sum(durations) / len(durations) if durations else 0
# Group by server
by_server: dict[str, dict] = {}
for e in recent:
if e.server not in by_server:
by_server[e.server] = {"calls": 0, "errors": 0, "avg_ms": 0}
by_server[e.server]["calls"] += 1
if not e.success:
by_server[e.server]["errors"] += 1
return {
"total_calls": len(recent),
"success_rate": round(len(successful) / len(recent) * 100, 1),
"avg_duration_ms": round(avg_duration, 1),
"cache_hit_rate": round(
sum(1 for e in recent if e.cached) / len(recent) * 100, 1
),
"by_server": by_server,
"slowest_calls": sorted(
[asdict(e) for e in recent],
key=lambda x: x["duration_ms"],
reverse=True,
)[:5],
}
# Usage
observer = MCPObserver()
async def observed_tool_call(server, tool, args, session):
async with observer.observe(server, tool, args) as event:
result = await session.call_tool(tool, args)
event.result_size_bytes = len(json.dumps(result, default=str).encode())
return result
After a week of data, you'll know exactly which tools are slow, which fail, and which waste context tokens.
Pattern 9: Graceful Degradation with Tool Capabilities
When an MCP server disconnects mid-conversation, your agent shouldn't crash. It should gracefully reduce its capabilities and inform the user.
interface ServerCapability {
server: string;
tools: string[];
status: "connected" | "degraded" | "disconnected";
lastSeen: number;
reconnectAttempts: number;
}
class CapabilityManager {
private capabilities: Map<string, ServerCapability> = new Map();
private listeners: Array<(event: CapabilityEvent) => void> = [];
registerServer(server: string, tools: string[]): void {
this.capabilities.set(server, {
server,
tools,
status: "connected",
lastSeen: Date.now(),
reconnectAttempts: 0,
});
this.emit({ type: "server_connected", server, tools });
}
markDisconnected(server: string): void {
const cap = this.capabilities.get(server);
if (!cap) return;
cap.status = "disconnected";
cap.reconnectAttempts++;
this.emit({
type: "server_disconnected",
server,
tools: cap.tools,
lostCapabilities: this.describeLostCapabilities(cap.tools),
});
}
markDegraded(server: string, availableTools: string[]): void {
const cap = this.capabilities.get(server);
if (!cap) return;
const lostTools = cap.tools.filter((t) => !availableTools.includes(t));
cap.status = "degraded";
cap.tools = availableTools;
if (lostTools.length > 0) {
this.emit({
type: "capabilities_reduced",
server,
tools: lostTools,
lostCapabilities: this.describeLostCapabilities(lostTools),
});
}
}
getSystemPromptAddendum(): string {
const disconnected = [...this.capabilities.values()].filter(
(c) => c.status === "disconnected"
);
if (disconnected.length === 0) return "";
const lines = [
"\n⚠️ REDUCED CAPABILITIES:",
"The following services are currently unavailable:",
];
for (const cap of disconnected) {
lines.push(
`- ${cap.server}: ${cap.tools.join(", ")} (disconnected, ${cap.reconnectAttempts} reconnect attempts)`
);
}
lines.push(
"\nDo not attempt to use these tools. Inform the user if they request functionality that depends on them."
);
return lines.join("\n");
}
getAvailableTools(): string[] {
return [...this.capabilities.values()]
.filter((c) => c.status !== "disconnected")
.flatMap((c) => c.tools);
}
private describeLostCapabilities(tools: string[]): string[] {
// Map tool names to human-readable capabilities
const descriptions: Record<string, string> = {
query: "Database queries",
search: "Full-text search",
read_file: "File system access",
send_email: "Email sending",
create_issue: "Issue tracking",
get_weather: "Weather data",
};
return tools.map((t) => descriptions[t] || t);
}
private emit(event: CapabilityEvent): void {
this.listeners.forEach((fn) => fn(event));
}
onEvent(listener: (event: CapabilityEvent) => void): void {
this.listeners.push(listener);
}
}
type CapabilityEvent =
| {
type: "server_connected";
server: string;
tools: string[];
}
| {
type: "server_disconnected";
server: string;
tools: string[];
lostCapabilities: string[];
}
| {
type: "capabilities_reduced";
server: string;
tools: string[];
lostCapabilities: string[];
};
// Integration: inject into system prompt dynamically
const capManager = new CapabilityManager();
capManager.registerServer("postgres-mcp", ["query", "execute"]);
capManager.registerServer("github-mcp", ["create_issue", "list_repos", "get_file_contents"]);
// When a server goes down:
capManager.markDisconnected("postgres-mcp");
// In your agent loop, append to system prompt:
const systemPrompt = baseSystemPrompt + capManager.getSystemPromptAddendum();
// Agent now knows: "Database queries are unavailable. Don't try to query."
The dynamic system prompt addendum is the magic. Your agent doesn't waste tokens trying tools that won't work.
Putting It All Together
These patterns aren't independent — they compose. Here's how they fit in a production MCP pipeline:
Agent Request
↓
[Tool Router] → Picks the right server/tool
↓
[Deduplicator] → Returns cached if available
↓
[Circuit Breaker] → Rejects if server is failing
↓
[Observer] → Starts timing
↓
[MCP Tool Call] → Actual call
↓
[Error Classifier] → Categorizes any errors
↓
[Fallback Chain] → Tries alternatives if needed
↓
[Budget Manager] → Truncates result to fit context
↓
[Observer] → Records metrics
↓
Agent Response
The order matters. Dedup before circuit breaker (no point checking health for cached results). Budget manager after the call (truncate what you got). Observer wraps everything (measure the full path).
Key Takeaways
- Circuit breakers prevent cascading failures when MCP servers go down
- Context budgets keep your agent thinking clearly by reserving reasoning space
- Tool versioning catches breaking changes before they break your agent
- Fallback chains ensure graceful degradation across data sources
- Deduplication saves tokens and money on repeated calls
- Error taxonomy gives your agent actionable recovery strategies
- Tool routing simplifies choices for the agent
- Observability reveals what's actually happening in production
- Capability management dynamically adjusts agent behavior to reality
MCP is powerful. But power without resilience is just a demo. Build for production.
Want production-ready implementations of all these patterns plus 20+ MCP server configs? Check out the AI Dev Toolkit — everything you need to ship MCP-based AI agents that actually survive production.
Top comments (0)