DEV Community

dohko
dohko

Posted on

9 MCP Sandboxing and Resilience Patterns That Stop AI Agents From Breaking in Production

Model Context Protocol (MCP) went from "cool demo protocol" to production infrastructure in about six months. But here's the thing — most tutorials show you the happy path. Connect a server, call a tool, done.

Production is different. Production means auth failures at 3 AM, context windows exploding, tools timing out, and agents calling the wrong tool because your descriptions were ambiguous.

These are 9 patterns I've battle-tested for keeping MCP-based systems alive in production. Real code. Real problems. Real fixes.


Pattern 1: The Circuit Breaker for MCP Tool Calls

Your agent calls an MCP tool. The server is down. The agent retries. And retries. And retries. Meanwhile, your context window fills with error messages and your user stares at a spinner.

The fix: wrap every MCP tool call in a circuit breaker.

import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable
import asyncio


class CircuitState(Enum):
    CLOSED = "closed"       # Normal operation
    OPEN = "open"           # Failing, reject calls
    HALF_OPEN = "half_open" # Testing if service recovered


@dataclass
class CircuitBreaker:
    failure_threshold: int = 3
    recovery_timeout: float = 30.0
    half_open_max_calls: int = 1

    state: CircuitState = CircuitState.CLOSED
    failure_count: int = 0
    last_failure_time: float = 0.0
    half_open_calls: int = 0

    async def call(self, func: Callable, *args, **kwargs) -> Any:
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
            else:
                raise CircuitOpenError(
                    f"Circuit open. Recovery in "
                    f"{self.recovery_timeout - (time.time() - self.last_failure_time):.1f}s"
                )

        if self.state == CircuitState.HALF_OPEN:
            if self.half_open_calls >= self.half_open_max_calls:
                raise CircuitOpenError("Circuit half-open, max test calls reached")
            self.half_open_calls += 1

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN


class CircuitOpenError(Exception):
    pass


# Usage with MCP tool calls
class ResilientMCPClient:
    def __init__(self):
        self.breakers: dict[str, CircuitBreaker] = {}

    def _get_breaker(self, server_name: str) -> CircuitBreaker:
        if server_name not in self.breakers:
            self.breakers[server_name] = CircuitBreaker(
                failure_threshold=3,
                recovery_timeout=30.0,
            )
        return self.breakers[server_name]

    async def call_tool(
        self,
        server_name: str,
        tool_name: str,
        arguments: dict,
        mcp_session: Any,
    ) -> Any:
        breaker = self._get_breaker(server_name)

        async def _do_call():
            return await mcp_session.call_tool(tool_name, arguments)

        try:
            return await breaker.call(_do_call)
        except CircuitOpenError:
            return {
                "error": f"Server '{server_name}' is temporarily unavailable",
                "tool": tool_name,
                "fallback": True,
            }
Enter fullscreen mode Exit fullscreen mode

The key insight: one circuit breaker per MCP server, not per tool. If the server is down, all its tools are down.


Pattern 2: Context Window Budget Manager

MCP tools return data. Sometimes a LOT of data. A database query tool might return 50KB of JSON. A file reader might dump an entire codebase. Your context window has a budget — blow it and your agent starts hallucinating or loses earlier instructions.

interface ContextBudget {
  maxTokens: number;
  usedTokens: number;
  reservedTokens: number; // Keep free for reasoning
  toolResultBudgets: Map<string, number>;
}

class ContextBudgetManager {
  private budget: ContextBudget;

  constructor(maxContextTokens: number) {
    this.budget = {
      maxTokens: maxContextTokens,
      usedTokens: 0,
      reservedTokens: Math.floor(maxContextTokens * 0.3), // 30% for reasoning
      toolResultBudgets: new Map([
        ["database_query", 4000],
        ["file_read", 3000],
        ["web_search", 2000],
        ["code_analysis", 5000],
      ]),
    };
  }

  getAvailableTokens(): number {
    return this.budget.maxTokens - this.budget.usedTokens - this.budget.reservedTokens;
  }

  truncateToolResult(toolName: string, result: string): string {
    const maxTokens =
      this.budget.toolResultBudgets.get(toolName) ??
      Math.min(2000, this.getAvailableTokens());

    // Rough estimate: 1 token ≈ 4 chars
    const maxChars = maxTokens * 4;

    if (result.length <= maxChars) {
      this.budget.usedTokens += Math.ceil(result.length / 4);
      return result;
    }

    const truncated = this.smartTruncate(result, maxChars);
    this.budget.usedTokens += maxTokens;
    return truncated;
  }

  private smartTruncate(text: string, maxChars: number): string {
    // For JSON, try to keep structure intact
    if (text.trimStart().startsWith("{") || text.trimStart().startsWith("[")) {
      return this.truncateJSON(text, maxChars);
    }

    // For plain text, keep beginning and end
    const headSize = Math.floor(maxChars * 0.7);
    const tailSize = maxChars - headSize - 50; // 50 chars for separator

    const head = text.slice(0, headSize);
    const tail = text.slice(-tailSize);

    return `${head}\n\n... [TRUNCATED: ${text.length - maxChars} chars removed] ...\n\n${tail}`;
  }

  private truncateJSON(text: string, maxChars: number): string {
    try {
      const parsed = JSON.parse(text);

      if (Array.isArray(parsed)) {
        // Keep first and last items, report total count
        const totalItems = parsed.length;
        const keepItems = Math.min(5, totalItems);
        const sample = parsed.slice(0, keepItems);

        const result = JSON.stringify(
          {
            _truncated: true,
            _totalItems: totalItems,
            _showingFirst: keepItems,
            items: sample,
          },
          null,
          2
        );

        if (result.length <= maxChars) return result;
      }

      // Fallback: stringify with no indent, then truncate
      const compact = JSON.stringify(parsed);
      if (compact.length <= maxChars) return compact;
      return compact.slice(0, maxChars) + "... [TRUNCATED]";
    } catch {
      return text.slice(0, maxChars) + "... [TRUNCATED]";
    }
  }

  reportUsage(): { used: number; available: number; percentage: number } {
    const available = this.getAvailableTokens();
    return {
      used: this.budget.usedTokens,
      available,
      percentage: Math.round(
        (this.budget.usedTokens / (this.budget.maxTokens - this.budget.reservedTokens)) * 100
      ),
    };
  }
}

// Integration with MCP tool results
async function processToolResult(
  budgetManager: ContextBudgetManager,
  toolName: string,
  rawResult: string
): Promise<string> {
  const usage = budgetManager.reportUsage();

  if (usage.percentage > 80) {
    console.warn(
      `⚠️ Context budget at ${usage.percentage}%. ` +
      `Aggressively truncating ${toolName} result.`
    );
  }

  return budgetManager.truncateToolResult(toolName, rawResult);
}
Enter fullscreen mode Exit fullscreen mode

The 30% reservation for reasoning is critical. Without it, your agent has all the data but no room to think about it.


Pattern 3: Tool Description Versioning

Your MCP server evolves. Tool descriptions change. But your agent's behavior depends on those descriptions — they're part of the prompt. Change a description and your agent might start calling tools differently.

import hashlib
import json
from dataclasses import dataclass
from datetime import datetime, timezone


@dataclass
class ToolVersion:
    name: str
    description: str
    input_schema: dict
    version_hash: str
    registered_at: datetime


class ToolRegistry:
    def __init__(self):
        self._tools: dict[str, list[ToolVersion]] = {}
        self._active: dict[str, ToolVersion] = {}

    def _compute_hash(self, description: str, schema: dict) -> str:
        content = f"{description}|{json.dumps(schema, sort_keys=True)}"
        return hashlib.sha256(content.encode()).hexdigest()[:12]

    def register_tool(
        self,
        name: str,
        description: str,
        input_schema: dict,
    ) -> ToolVersion:
        version_hash = self._compute_hash(description, input_schema)

        # Check if this exact version already exists
        if name in self._tools:
            for existing in self._tools[name]:
                if existing.version_hash == version_hash:
                    return existing

        version = ToolVersion(
            name=name,
            description=description,
            input_schema=input_schema,
            version_hash=version_hash,
            registered_at=datetime.now(timezone.utc),
        )

        self._tools.setdefault(name, []).append(version)
        self._active[name] = version
        return version

    def detect_drift(self, server_tools: list[dict]) -> list[dict]:
        """Compare current server tools against registered versions."""
        changes = []
        for tool in server_tools:
            name = tool["name"]
            if name not in self._active:
                changes.append({
                    "type": "new_tool",
                    "tool": name,
                    "action": "register",
                })
                continue

            current_hash = self._compute_hash(
                tool["description"],
                tool.get("inputSchema", {}),
            )
            active = self._active[name]

            if current_hash != active.version_hash:
                changes.append({
                    "type": "tool_changed",
                    "tool": name,
                    "old_hash": active.version_hash,
                    "new_hash": current_hash,
                    "old_description": active.description,
                    "new_description": tool["description"],
                    "action": "review_and_update",
                })

        # Check for removed tools
        server_tool_names = {t["name"] for t in server_tools}
        for name in self._active:
            if name not in server_tool_names:
                changes.append({
                    "type": "tool_removed",
                    "tool": name,
                    "action": "deregister",
                })

        return changes

    def get_history(self, tool_name: str) -> list[ToolVersion]:
        return self._tools.get(tool_name, [])


# Usage: on every MCP server reconnection
async def on_server_reconnect(
    registry: ToolRegistry,
    server_tools: list[dict],
    alert_callback=None,
):
    drift = registry.detect_drift(server_tools)

    if drift:
        print(f"⚠️ Tool drift detected: {len(drift)} changes")
        for change in drift:
            print(f"  - {change['type']}: {change['tool']}")
            if change["type"] == "tool_changed":
                # Log the before/after for debugging
                print(f"    Old: {change['old_description'][:80]}...")
                print(f"    New: {change['new_description'][:80]}...")

        if alert_callback:
            await alert_callback(drift)

    # Register all current tools
    for tool in server_tools:
        registry.register_tool(
            name=tool["name"],
            description=tool["description"],
            input_schema=tool.get("inputSchema", {}),
        )
Enter fullscreen mode Exit fullscreen mode

This catches those sneaky bugs where someone updates a tool description and suddenly your agent sends parameters in a different format.


Pattern 4: The Fallback Chain

Not every tool call needs to succeed. Sometimes you have alternatives. A database query fails? Try the cache. Cache miss? Use a default. This pattern chains MCP tools with automatic fallback.

type ToolCall = {
  server: string;
  tool: string;
  arguments: Record<string, unknown>;
};

type FallbackResult = {
  result: unknown;
  source: string;
  fallbackUsed: boolean;
  attempts: Array<{ server: string; tool: string; error?: string }>;
};

class FallbackChain {
  private chains: Map<string, ToolCall[]> = new Map();

  register(intentName: string, tools: ToolCall[]): void {
    this.chains.set(intentName, tools);
  }

  async execute(
    intentName: string,
    mcpClients: Map<string, any>,
    overrideArgs?: Record<string, unknown>
  ): Promise<FallbackResult> {
    const chain = this.chains.get(intentName);
    if (!chain || chain.length === 0) {
      throw new Error(`No fallback chain registered for "${intentName}"`);
    }

    const attempts: FallbackResult["attempts"] = [];

    for (const toolCall of chain) {
      const client = mcpClients.get(toolCall.server);
      if (!client) {
        attempts.push({
          server: toolCall.server,
          tool: toolCall.tool,
          error: "Server not connected",
        });
        continue;
      }

      try {
        const args = { ...toolCall.arguments, ...overrideArgs };
        const result = await client.callTool(toolCall.tool, args);

        // Validate result isn't empty/null
        if (result === null || result === undefined) {
          attempts.push({
            server: toolCall.server,
            tool: toolCall.tool,
            error: "Empty result",
          });
          continue;
        }

        return {
          result,
          source: `${toolCall.server}/${toolCall.tool}`,
          fallbackUsed: attempts.length > 0,
          attempts,
        };
      } catch (error) {
        attempts.push({
          server: toolCall.server,
          tool: toolCall.tool,
          error: error instanceof Error ? error.message : String(error),
        });
      }
    }

    throw new FallbackExhaustedError(intentName, attempts);
  }
}

class FallbackExhaustedError extends Error {
  constructor(
    public intent: string,
    public attempts: FallbackResult["attempts"]
  ) {
    super(
      `All fallbacks exhausted for "${intent}". ` +
        `Tried: ${attempts.map((a) => `${a.server}/${a.tool}`).join("")}`
    );
  }
}

// Setup example
const fallbacks = new FallbackChain();

fallbacks.register("get_user_data", [
  {
    server: "postgres-mcp",
    tool: "query",
    arguments: { sql: "SELECT * FROM users WHERE id = $1" },
  },
  {
    server: "redis-mcp",
    tool: "get",
    arguments: { key: "user:{id}" },
  },
  {
    server: "api-mcp",
    tool: "http_get",
    arguments: { url: "/api/users/{id}" },
  },
]);

// Usage
const userData = await fallbacks.execute("get_user_data", mcpClients, {
  id: "user-123",
});

console.log(`Got data from: ${userData.source}`);
if (userData.fallbackUsed) {
  console.warn(`Primary source failed. Used fallback after ${userData.attempts.length} attempts.`);
}
Enter fullscreen mode Exit fullscreen mode

The attempts log is gold for debugging. You'll always know why a particular source was chosen.


Pattern 5: Request Deduplication Layer

Agents are chatty. They'll call the same tool with the same arguments multiple times in a single conversation. A dedup layer saves tokens, time, and API costs.

import hashlib
import json
import time
from dataclasses import dataclass, field
from typing import Any, Optional


@dataclass
class CachedResult:
    result: Any
    timestamp: float
    call_count: int = 1


class MCPDeduplicator:
    def __init__(self, ttl_seconds: float = 300.0):
        self.ttl = ttl_seconds
        self._cache: dict[str, CachedResult] = {}
        self._stats = {"hits": 0, "misses": 0, "evictions": 0}

    def _make_key(self, server: str, tool: str, arguments: dict) -> str:
        """Deterministic cache key from call parameters."""
        normalized = json.dumps(
            {"server": server, "tool": tool, "args": arguments},
            sort_keys=True,
            default=str,
        )
        return hashlib.sha256(normalized.encode()).hexdigest()

    def _is_cacheable(self, tool: str) -> bool:
        """Some tools should never be cached (side effects)."""
        non_cacheable = {
            "send_email",
            "create_record",
            "update_record",
            "delete_record",
            "execute_command",
            "write_file",
            "post_message",
        }
        return tool not in non_cacheable

    async def call_tool(
        self,
        server: str,
        tool: str,
        arguments: dict,
        mcp_session: Any,
        force_refresh: bool = False,
    ) -> tuple[Any, bool]:
        """Returns (result, was_cached)."""

        if not self._is_cacheable(tool) or force_refresh:
            result = await mcp_session.call_tool(tool, arguments)
            self._stats["misses"] += 1
            return result, False

        key = self._make_key(server, tool, arguments)

        # Check cache
        if key in self._cache:
            cached = self._cache[key]
            age = time.time() - cached.timestamp

            if age < self.ttl:
                cached.call_count += 1
                self._stats["hits"] += 1
                return cached.result, True
            else:
                del self._cache[key]
                self._stats["evictions"] += 1

        # Cache miss — make the actual call
        result = await mcp_session.call_tool(tool, arguments)
        self._cache[key] = CachedResult(
            result=result,
            timestamp=time.time(),
        )
        self._stats["misses"] += 1
        return result, False

    def get_stats(self) -> dict:
        total = self._stats["hits"] + self._stats["misses"]
        hit_rate = (self._stats["hits"] / total * 100) if total > 0 else 0
        return {
            **self._stats,
            "total_calls": total,
            "hit_rate_pct": round(hit_rate, 1),
            "cache_size": len(self._cache),
        }

    def invalidate(self, server: str, tool: str, arguments: dict) -> bool:
        """Manually invalidate a specific cached result."""
        key = self._make_key(server, tool, arguments)
        if key in self._cache:
            del self._cache[key]
            return True
        return False

    def clear(self) -> int:
        """Clear entire cache. Returns number of evicted entries."""
        count = len(self._cache)
        self._cache.clear()
        return count
Enter fullscreen mode Exit fullscreen mode

The _is_cacheable check is essential. You don't want to cache write operations — "send that email" should actually send it every time.


Pattern 6: Structured Error Taxonomy

When an MCP tool fails, your agent needs to understand why — not just "error occurred". Different errors need different responses: retry, fallback, ask the user, or give up.

from enum import Enum
from dataclasses import dataclass
from typing import Optional, Any


class ErrorCategory(Enum):
    TRANSIENT = "transient"         # Retry might work
    AUTH = "auth"                   # Credentials issue
    VALIDATION = "validation"       # Bad input from agent
    NOT_FOUND = "not_found"         # Resource doesn't exist
    RATE_LIMITED = "rate_limited"   # Too many calls
    PERMISSION = "permission"       # Allowed but can't do this
    SERVER_ERROR = "server_error"   # MCP server broken
    TIMEOUT = "timeout"             # Took too long


@dataclass
class ClassifiedError:
    category: ErrorCategory
    message: str
    original_error: Optional[Exception] = None
    retry_after_seconds: Optional[float] = None
    suggestion: Optional[str] = None


class MCPErrorClassifier:
    """Classify raw MCP errors into actionable categories."""

    TRANSIENT_PATTERNS = [
        "connection reset",
        "temporary failure",
        "service unavailable",
        "ECONNRESET",
        "ETIMEDOUT",
    ]

    AUTH_PATTERNS = [
        "unauthorized",
        "invalid token",
        "expired token",
        "authentication failed",
        "401",
    ]

    RATE_LIMIT_PATTERNS = [
        "rate limit",
        "too many requests",
        "429",
        "quota exceeded",
    ]

    def classify(self, error: Exception, tool_name: str = "") -> ClassifiedError:
        msg = str(error).lower()

        # Check patterns in priority order
        if any(p in msg for p in self.RATE_LIMIT_PATTERNS):
            retry_after = self._extract_retry_after(str(error))
            return ClassifiedError(
                category=ErrorCategory.RATE_LIMITED,
                message=f"Rate limited on {tool_name}",
                original_error=error,
                retry_after_seconds=retry_after or 60.0,
                suggestion="Wait before retrying. Consider reducing call frequency.",
            )

        if any(p in msg for p in self.AUTH_PATTERNS):
            return ClassifiedError(
                category=ErrorCategory.AUTH,
                message=f"Authentication failed for {tool_name}",
                original_error=error,
                suggestion="Check API keys and token expiry. Do not retry.",
            )

        if any(p in msg for p in self.TRANSIENT_PATTERNS):
            return ClassifiedError(
                category=ErrorCategory.TRANSIENT,
                message=f"Transient error on {tool_name}",
                original_error=error,
                retry_after_seconds=5.0,
                suggestion="Retry with exponential backoff.",
            )

        if "timeout" in msg or isinstance(error, TimeoutError):
            return ClassifiedError(
                category=ErrorCategory.TIMEOUT,
                message=f"Timeout calling {tool_name}",
                original_error=error,
                retry_after_seconds=10.0,
                suggestion="Retry with longer timeout or simpler query.",
            )

        if "not found" in msg or "404" in msg:
            return ClassifiedError(
                category=ErrorCategory.NOT_FOUND,
                message=f"Resource not found for {tool_name}",
                original_error=error,
                suggestion="Verify resource ID/path. Do not retry with same args.",
            )

        if "validation" in msg or "invalid" in msg or "schema" in msg:
            return ClassifiedError(
                category=ErrorCategory.VALIDATION,
                message=f"Invalid input for {tool_name}",
                original_error=error,
                suggestion="Fix input parameters before retrying.",
            )

        # Default: server error
        return ClassifiedError(
            category=ErrorCategory.SERVER_ERROR,
            message=f"Server error on {tool_name}: {str(error)[:200]}",
            original_error=error,
            suggestion="Check MCP server logs. May need manual intervention.",
        )

    def _extract_retry_after(self, error_msg: str) -> Optional[float]:
        """Try to extract retry-after value from error message."""
        import re
        match = re.search(r"retry.after[:\s]+(\d+)", error_msg, re.IGNORECASE)
        if match:
            return float(match.group(1))
        return None

    def should_retry(self, classified: ClassifiedError) -> bool:
        retryable = {
            ErrorCategory.TRANSIENT,
            ErrorCategory.RATE_LIMITED,
            ErrorCategory.TIMEOUT,
        }
        return classified.category in retryable

    def format_for_agent(self, classified: ClassifiedError) -> str:
        """Format error for inclusion in agent context."""
        parts = [
            f"[{classified.category.value.upper()}] {classified.message}",
        ]
        if classified.suggestion:
            parts.append(f"Suggestion: {classified.suggestion}")
        if classified.retry_after_seconds and self.should_retry(classified):
            parts.append(f"Retry after: {classified.retry_after_seconds}s")
        return " | ".join(parts)
Enter fullscreen mode Exit fullscreen mode

The format_for_agent method is key — it gives the LLM structured info to decide what to do next instead of raw stack traces.


Pattern 7: Multi-Server Tool Router

When you have 5+ MCP servers, your agent gets overwhelmed with tool choices. A router layer maps intents to the right server/tool combination.

interface ToolRoute {
  server: string;
  tool: string;
  priority: number;
  condition?: (args: Record<string, unknown>) => boolean;
}

interface RouteConfig {
  intent: string;
  description: string;
  routes: ToolRoute[];
  defaultArgs?: Record<string, unknown>;
}

class MCPToolRouter {
  private routes: Map<string, RouteConfig> = new Map();
  private serverHealth: Map<string, boolean> = new Map();

  register(config: RouteConfig): void {
    this.routes.set(config.intent, config);
  }

  updateServerHealth(server: string, healthy: boolean): void {
    this.serverHealth.set(server, healthy);
  }

  resolve(
    intent: string,
    args: Record<string, unknown> = {}
  ): { server: string; tool: string; args: Record<string, unknown> } | null {
    const config = this.routes.get(intent);
    if (!config) return null;

    // Sort by priority, filter by health and conditions
    const candidates = config.routes
      .filter((r) => this.serverHealth.get(r.server) !== false)
      .filter((r) => !r.condition || r.condition(args))
      .sort((a, b) => a.priority - b.priority);

    if (candidates.length === 0) return null;

    const best = candidates[0];
    return {
      server: best.server,
      tool: best.tool,
      args: { ...config.defaultArgs, ...args },
    };
  }

  // Generate a simplified tool list for the agent's system prompt
  generateToolManifest(): string {
    const lines: string[] = ["# Available Actions\n"];

    for (const [intent, config] of this.routes) {
      const healthyServers = config.routes.filter(
        (r) => this.serverHealth.get(r.server) !== false
      );

      const status = healthyServers.length > 0 ? "" : "";
      lines.push(`${status} **${intent}**: ${config.description}`);
    }

    return lines.join("\n");
  }
}

// Setup
const router = new MCPToolRouter();

router.register({
  intent: "search_documents",
  description: "Search across all document stores",
  routes: [
    {
      server: "elasticsearch-mcp",
      tool: "search",
      priority: 1,
      condition: (args) => typeof args.query === "string" && args.query.length > 0,
    },
    {
      server: "postgres-mcp",
      tool: "full_text_search",
      priority: 2,
    },
  ],
});

router.register({
  intent: "read_file",
  description: "Read a file from any connected filesystem",
  routes: [
    {
      server: "local-fs-mcp",
      tool: "read_file",
      priority: 1,
      condition: (args) =>
        typeof args.path === "string" && !args.path.startsWith("s3://"),
    },
    {
      server: "s3-mcp",
      tool: "get_object",
      priority: 1,
      condition: (args) =>
        typeof args.path === "string" && args.path.startsWith("s3://"),
    },
    {
      server: "github-mcp",
      tool: "get_file_contents",
      priority: 2,
    },
  ],
});

// In agent loop
const resolved = router.resolve("search_documents", {
  query: "quarterly revenue report",
});

if (resolved) {
  const result = await mcpClients
    .get(resolved.server)!
    .callTool(resolved.tool, resolved.args);
}
Enter fullscreen mode Exit fullscreen mode

This keeps your agent's prompt clean. Instead of 40 tools from 8 servers, the agent sees 10 clear intents.


Pattern 8: Observability Pipeline for Tool Calls

You can't improve what you can't measure. Every MCP tool call should emit structured telemetry.

import time
import json
import logging
from dataclasses import dataclass, asdict
from typing import Any, Optional
from contextlib import asynccontextmanager
from datetime import datetime, timezone
import uuid


@dataclass
class ToolCallEvent:
    event_id: str
    timestamp: str
    server: str
    tool: str
    arguments_hash: str
    duration_ms: float
    success: bool
    error_category: Optional[str] = None
    result_size_bytes: Optional[int] = None
    cached: bool = False
    fallback_used: bool = False
    context_tokens_used: Optional[int] = None


class MCPObserver:
    def __init__(self, logger: Optional[logging.Logger] = None):
        self.logger = logger or logging.getLogger("mcp.observer")
        self._events: list[ToolCallEvent] = []

    @asynccontextmanager
    async def observe(
        self,
        server: str,
        tool: str,
        arguments: dict,
    ):
        event_id = str(uuid.uuid4())[:8]
        start = time.perf_counter()

        # Hash arguments (don't log sensitive data)
        import hashlib
        args_hash = hashlib.sha256(
            json.dumps(arguments, sort_keys=True, default=str).encode()
        ).hexdigest()[:12]

        event = ToolCallEvent(
            event_id=event_id,
            timestamp=datetime.now(timezone.utc).isoformat(),
            server=server,
            tool=tool,
            arguments_hash=args_hash,
            duration_ms=0,
            success=False,
        )

        try:
            yield event
            event.success = True
        except Exception as e:
            event.error_category = type(e).__name__
            raise
        finally:
            event.duration_ms = round((time.perf_counter() - start) * 1000, 2)
            self._events.append(event)

            # Structured log
            self.logger.info(
                "mcp_tool_call",
                extra={"event": asdict(event)},
            )

    def get_summary(self, last_n: int = 100) -> dict:
        recent = self._events[-last_n:]
        if not recent:
            return {"total_calls": 0}

        successful = [e for e in recent if e.success]
        failed = [e for e in recent if not e.success]

        durations = [e.duration_ms for e in successful]
        avg_duration = sum(durations) / len(durations) if durations else 0

        # Group by server
        by_server: dict[str, dict] = {}
        for e in recent:
            if e.server not in by_server:
                by_server[e.server] = {"calls": 0, "errors": 0, "avg_ms": 0}
            by_server[e.server]["calls"] += 1
            if not e.success:
                by_server[e.server]["errors"] += 1

        return {
            "total_calls": len(recent),
            "success_rate": round(len(successful) / len(recent) * 100, 1),
            "avg_duration_ms": round(avg_duration, 1),
            "cache_hit_rate": round(
                sum(1 for e in recent if e.cached) / len(recent) * 100, 1
            ),
            "by_server": by_server,
            "slowest_calls": sorted(
                [asdict(e) for e in recent],
                key=lambda x: x["duration_ms"],
                reverse=True,
            )[:5],
        }


# Usage
observer = MCPObserver()

async def observed_tool_call(server, tool, args, session):
    async with observer.observe(server, tool, args) as event:
        result = await session.call_tool(tool, args)
        event.result_size_bytes = len(json.dumps(result, default=str).encode())
        return result
Enter fullscreen mode Exit fullscreen mode

After a week of data, you'll know exactly which tools are slow, which fail, and which waste context tokens.


Pattern 9: Graceful Degradation with Tool Capabilities

When an MCP server disconnects mid-conversation, your agent shouldn't crash. It should gracefully reduce its capabilities and inform the user.

interface ServerCapability {
  server: string;
  tools: string[];
  status: "connected" | "degraded" | "disconnected";
  lastSeen: number;
  reconnectAttempts: number;
}

class CapabilityManager {
  private capabilities: Map<string, ServerCapability> = new Map();
  private listeners: Array<(event: CapabilityEvent) => void> = [];

  registerServer(server: string, tools: string[]): void {
    this.capabilities.set(server, {
      server,
      tools,
      status: "connected",
      lastSeen: Date.now(),
      reconnectAttempts: 0,
    });
    this.emit({ type: "server_connected", server, tools });
  }

  markDisconnected(server: string): void {
    const cap = this.capabilities.get(server);
    if (!cap) return;

    cap.status = "disconnected";
    cap.reconnectAttempts++;
    this.emit({
      type: "server_disconnected",
      server,
      tools: cap.tools,
      lostCapabilities: this.describeLostCapabilities(cap.tools),
    });
  }

  markDegraded(server: string, availableTools: string[]): void {
    const cap = this.capabilities.get(server);
    if (!cap) return;

    const lostTools = cap.tools.filter((t) => !availableTools.includes(t));
    cap.status = "degraded";
    cap.tools = availableTools;

    if (lostTools.length > 0) {
      this.emit({
        type: "capabilities_reduced",
        server,
        tools: lostTools,
        lostCapabilities: this.describeLostCapabilities(lostTools),
      });
    }
  }

  getSystemPromptAddendum(): string {
    const disconnected = [...this.capabilities.values()].filter(
      (c) => c.status === "disconnected"
    );

    if (disconnected.length === 0) return "";

    const lines = [
      "\n⚠️ REDUCED CAPABILITIES:",
      "The following services are currently unavailable:",
    ];

    for (const cap of disconnected) {
      lines.push(
        `- ${cap.server}: ${cap.tools.join(", ")} (disconnected, ${cap.reconnectAttempts} reconnect attempts)`
      );
    }

    lines.push(
      "\nDo not attempt to use these tools. Inform the user if they request functionality that depends on them."
    );

    return lines.join("\n");
  }

  getAvailableTools(): string[] {
    return [...this.capabilities.values()]
      .filter((c) => c.status !== "disconnected")
      .flatMap((c) => c.tools);
  }

  private describeLostCapabilities(tools: string[]): string[] {
    // Map tool names to human-readable capabilities
    const descriptions: Record<string, string> = {
      query: "Database queries",
      search: "Full-text search",
      read_file: "File system access",
      send_email: "Email sending",
      create_issue: "Issue tracking",
      get_weather: "Weather data",
    };

    return tools.map((t) => descriptions[t] || t);
  }

  private emit(event: CapabilityEvent): void {
    this.listeners.forEach((fn) => fn(event));
  }

  onEvent(listener: (event: CapabilityEvent) => void): void {
    this.listeners.push(listener);
  }
}

type CapabilityEvent =
  | {
      type: "server_connected";
      server: string;
      tools: string[];
    }
  | {
      type: "server_disconnected";
      server: string;
      tools: string[];
      lostCapabilities: string[];
    }
  | {
      type: "capabilities_reduced";
      server: string;
      tools: string[];
      lostCapabilities: string[];
    };

// Integration: inject into system prompt dynamically
const capManager = new CapabilityManager();

capManager.registerServer("postgres-mcp", ["query", "execute"]);
capManager.registerServer("github-mcp", ["create_issue", "list_repos", "get_file_contents"]);

// When a server goes down:
capManager.markDisconnected("postgres-mcp");

// In your agent loop, append to system prompt:
const systemPrompt = baseSystemPrompt + capManager.getSystemPromptAddendum();
// Agent now knows: "Database queries are unavailable. Don't try to query."
Enter fullscreen mode Exit fullscreen mode

The dynamic system prompt addendum is the magic. Your agent doesn't waste tokens trying tools that won't work.


Putting It All Together

These patterns aren't independent — they compose. Here's how they fit in a production MCP pipeline:

Agent Request
    ↓
[Tool Router]        → Picks the right server/tool
    ↓
[Deduplicator]       → Returns cached if available
    ↓
[Circuit Breaker]    → Rejects if server is failing
    ↓
[Observer]           → Starts timing
    ↓
[MCP Tool Call]      → Actual call
    ↓
[Error Classifier]   → Categorizes any errors
    ↓
[Fallback Chain]     → Tries alternatives if needed
    ↓
[Budget Manager]     → Truncates result to fit context
    ↓
[Observer]           → Records metrics
    ↓
Agent Response
Enter fullscreen mode Exit fullscreen mode

The order matters. Dedup before circuit breaker (no point checking health for cached results). Budget manager after the call (truncate what you got). Observer wraps everything (measure the full path).


Key Takeaways

  1. Circuit breakers prevent cascading failures when MCP servers go down
  2. Context budgets keep your agent thinking clearly by reserving reasoning space
  3. Tool versioning catches breaking changes before they break your agent
  4. Fallback chains ensure graceful degradation across data sources
  5. Deduplication saves tokens and money on repeated calls
  6. Error taxonomy gives your agent actionable recovery strategies
  7. Tool routing simplifies choices for the agent
  8. Observability reveals what's actually happening in production
  9. Capability management dynamically adjusts agent behavior to reality

MCP is powerful. But power without resilience is just a demo. Build for production.


Want production-ready implementations of all these patterns plus 20+ MCP server configs? Check out the AI Dev Toolkit — everything you need to ship MCP-based AI agents that actually survive production.

Top comments (0)