DEV Community

dohko
dohko

Posted on

9 MCP Production Patterns That Actually Scale Multi-Agent Systems (2026)

9 MCP Production Patterns That Actually Scale Multi-Agent Systems (2026)

Model Context Protocol went from "interesting spec" to industry standard in under a year. 97 million monthly SDK downloads. Every major AI provider on board — Anthropic, OpenAI, Google, Microsoft, Amazon.

But most tutorials still show toy examples. A weather tool. A calculator. Cool for demos, useless for production.

Here are 9 patterns we've battle-tested in real multi-agent systems — with code you can ship today.


1. The Tool Registry Pattern

Don't hardcode tools. Register them dynamically so agents discover capabilities at runtime.

// mcp-registry/src/registry.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

interface ToolDefinition {
  name: string;
  description: "string;"
  inputSchema: Record<string, unknown>;
  handler: (args: Record<string, unknown>) => Promise<unknown>;
  version: string;
  healthCheck?: () => Promise<boolean>;
}

class ToolRegistry {
  private tools = new Map<string, ToolDefinition>();
  private healthStatus = new Map<string, boolean>();

  register(tool: ToolDefinition): void {
    this.tools.set(tool.name, tool);
    this.healthStatus.set(tool.name, true);
    console.error(`[registry] Registered tool: ${tool.name} v${tool.version}`);
  }

  unregister(name: string): void {
    this.tools.delete(name);
    this.healthStatus.delete(name);
    console.error(`[registry] Unregistered tool: ${name}`);
  }

  getHealthy(): ToolDefinition[] {
    return Array.from(this.tools.values()).filter(
      (t) => this.healthStatus.get(t.name) === true
    );
  }

  async runHealthChecks(): Promise<void> {
    for (const [name, tool] of this.tools) {
      if (tool.healthCheck) {
        try {
          const healthy = await tool.healthCheck();
          this.healthStatus.set(name, healthy);
        } catch {
          this.healthStatus.set(name, false);
          console.error(`[registry] Health check failed: ${name}`);
        }
      }
    }
  }
}

const registry = new ToolRegistry();

// Example: register a database query tool
registry.register({
  name: "query_database",
  description: "\"Execute read-only SQL queries against the analytics database\","
  version: "2.1.0",
  inputSchema: {
    type: "object",
    properties: {
      query: { type: "string", description: "\"SQL SELECT query\" },"
      timeout_ms: { type: "number", description: "\"Query timeout\", default: 5000 },"
    },
    required: ["query"],
  },
  handler: async (args) => {
    const query = args.query as string;
    if (!query.trim().toUpperCase().startsWith("SELECT")) {
      throw new Error("Only SELECT queries allowed");
    }
    // Execute against your DB pool
    return await executeQuery(query, args.timeout_ms as number);
  },
  healthCheck: async () => {
    try {
      await executeQuery("SELECT 1", 1000);
      return true;
    } catch {
      return false;
    }
  },
});

// Wire up the MCP server
const server = new Server({ name: "tool-registry", version: "1.0.0" }, {
  capabilities: { tools: {} },
});

server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: registry.getHealthy().map((t) => ({
    name: t.name,
    description: "t.description,"
    inputSchema: t.inputSchema,
  })),
}));

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const tool = registry.getHealthy().find((t) => t.name === request.params.name);
  if (!tool) {
    return { content: [{ type: "text", text: `Tool not found or unhealthy: ${request.params.name}` }], isError: true };
  }
  try {
    const result = await tool.handler(request.params.arguments ?? {});
    return { content: [{ type: "text", text: JSON.stringify(result, null, 2) }] };
  } catch (err) {
    return { content: [{ type: "text", text: `Error: ${(err as Error).message}` }], isError: true };
  }
});

// Health check loop
setInterval(() => registry.runHealthChecks(), 30_000);

const transport = new StdioServerTransport();
await server.connect(transport);
Enter fullscreen mode Exit fullscreen mode

Why it matters: In production you have dozens of tools. Some go down. The registry pattern means agents only see what's actually working.


2. Context Window Budget Manager

MCP servers can return massive payloads. Without budget management, you blow your context window on one tool call.

# budget_manager.py
import tiktoken
from dataclasses import dataclass
from typing import Any

@dataclass
class ContextBudget:
    total_tokens: int
    reserved_for_response: int = 4096
    reserved_for_system: int = 2000
    used_tokens: int = 0

    @property
    def available(self) -> int:
        return self.total_tokens - self.reserved_for_response - self.reserved_for_system - self.used_tokens

    def consume(self, tokens: int) -> None:
        self.used_tokens += tokens

    def can_afford(self, tokens: int) -> bool:
        return tokens <= self.available


class MCPBudgetProxy:
    """Wraps MCP tool results to enforce context budgets."""

    def __init__(self, budget: ContextBudget, model: str = "gpt-4"):
        self.budget = budget
        self.encoder = tiktoken.encoding_for_model(model)

    def count_tokens(self, text: str) -> int:
        return len(self.encoder.encode(text))

    def truncate_to_budget(self, text: str, max_fraction: float = 0.3) -> str:
        """Truncate result to fit within budget fraction."""
        max_tokens = int(self.budget.available * max_fraction)
        tokens = self.encoder.encode(text)

        if len(tokens) <= max_tokens:
            self.budget.consume(len(tokens))
            return text

        # Truncate and add indicator
        truncated = self.encoder.decode(tokens[:max_tokens - 20])
        suffix = f"\n\n[TRUNCATED: {len(tokens)} tokens → {max_tokens} tokens. Request specific sections for full data.]"
        result = truncated + suffix
        self.budget.consume(max_tokens)
        return result

    async def call_tool_with_budget(
        self,
        mcp_client,
        tool_name: str,
        arguments: dict[str, Any],
        max_fraction: float = 0.3,
    ) -> str:
        """Call MCP tool and enforce budget on response."""
        if self.budget.available < 500:
            return "[BUDGET EXHAUSTED: Cannot make more tool calls. Respond with available context.]"

        raw_result = await mcp_client.call_tool(tool_name, arguments)
        text = raw_result.content[0].text if raw_result.content else ""

        return self.truncate_to_budget(text, max_fraction)


# Usage in an agent loop
budget = ContextBudget(total_tokens=128_000)
proxy = MCPBudgetProxy(budget)

async def agent_step(mcp_client, tool_name: str, args: dict) -> str:
    result = await proxy.call_tool_with_budget(mcp_client, tool_name, args)
    print(f"Budget remaining: {budget.available} tokens")
    return result
Enter fullscreen mode Exit fullscreen mode

The lesson: Production agents need resource management. Treat context tokens like memory — allocate, track, and refuse when depleted.


3. MCP Server Composition (The Gateway Pattern)

One agent shouldn't connect to 15 MCP servers. Build a gateway that composes multiple servers behind a single interface.

# mcp_gateway.py
import asyncio
import json
from dataclasses import dataclass, field

@dataclass
class MCPServerConfig:
    name: str
    command: str
    args: list[str] = field(default_factory=list)
    env: dict[str, str] = field(default_factory=dict)
    priority: int = 0  # Higher = preferred when tools overlap

@dataclass
class GatewayTool:
    name: str
    server: str
    original_name: str
    description: str
    input_schema: dict

class MCPGateway:
    """Composes multiple MCP servers into a single tool namespace."""

    def __init__(self, configs: list[MCPServerConfig]):
        self.configs = {c.name: c for c in configs}
        self.connections: dict[str, Any] = {}
        self.tool_map: dict[str, GatewayTool] = {}

    async def connect_all(self) -> None:
        for name, config in self.configs.items():
            try:
                conn = await self._connect_server(config)
                self.connections[name] = conn
                tools = await conn.list_tools()
                for tool in tools:
                    namespaced = f"{name}__{tool.name}"
                    self.tool_map[namespaced] = GatewayTool(
                        name=namespaced,
                        server=name,
                        original_name=tool.name,
                        description=f"[{name}] {tool.description}",
                        input_schema=tool.inputSchema,
                    )
                print(f"[gateway] Connected: {name} ({len(tools)} tools)")
            except Exception as e:
                print(f"[gateway] Failed to connect {name}: {e}")

    async def call_tool(self, namespaced_name: str, arguments: dict) -> Any:
        gateway_tool = self.tool_map.get(namespaced_name)
        if not gateway_tool:
            raise ValueError(f"Unknown tool: {namespaced_name}")

        conn = self.connections.get(gateway_tool.server)
        if not conn:
            raise ConnectionError(f"Server disconnected: {gateway_tool.server}")

        return await conn.call_tool(gateway_tool.original_name, arguments)

    def list_tools(self) -> list[dict]:
        return [
            {
                "name": t.name,
                "description": t.description,
                "inputSchema": t.input_schema,
            }
            for t in self.tool_map.values()
        ]

    async def _connect_server(self, config: MCPServerConfig):
        # Uses MCP SDK client to connect via stdio
        from mcp import ClientSession, StdioServerParameters
        from mcp.client.stdio import stdio_client

        params = StdioServerParameters(
            command=config.command,
            args=config.args,
            env=config.env,
        )
        transport = await stdio_client(params).__aenter__()
        session = ClientSession(*transport)
        await session.initialize()
        return session


# Configuration
gateway = MCPGateway([
    MCPServerConfig(name="db", command="node", args=["./mcp-db-server/dist/index.js"]),
    MCPServerConfig(name="github", command="npx", args=["-y", "@modelcontextprotocol/server-github"]),
    MCPServerConfig(name="search", command="python", args=["./mcp-search-server.py"]),
])
Enter fullscreen mode Exit fullscreen mode

Why: A gateway gives you one connection, namespaced tools, and centralized error handling. Your agent doesn't need to know the topology.


4. Authentication Proxy for MCP

MCP has no built-in auth. Wrap your servers with an auth layer before exposing them.

// mcp-auth-proxy.ts
import { createServer } from "net";
import { randomUUID } from "crypto";
import jwt from "jsonwebtoken";

interface AuthConfig {
  jwtSecret: string;
  allowedScopes: Map<string, string[]>; // tool_name → required scopes
}

interface AuthenticatedRequest {
  userId: string;
  scopes: string[];
  requestId: string;
}

class MCPAuthProxy {
  private config: AuthConfig;
  private auditLog: Array<{
    timestamp: string;
    requestId: string;
    userId: string;
    tool: string;
    allowed: boolean;
  }> = [];

  constructor(config: AuthConfig) {
    this.config = config;
  }

  authenticate(token: string): AuthenticatedRequest | null {
    try {
      const decoded = jwt.verify(token, this.config.jwtSecret) as {
        sub: string;
        scopes: string[];
      };
      return {
        userId: decoded.sub,
        scopes: decoded.scopes,
        requestId: randomUUID(),
      };
    } catch {
      return null;
    }
  }

  authorize(auth: AuthenticatedRequest, toolName: string): boolean {
    const requiredScopes = this.config.allowedScopes.get(toolName) ?? ["admin"];
    const allowed = requiredScopes.some((s) => auth.scopes.includes(s));

    this.auditLog.push({
      timestamp: new Date().toISOString(),
      requestId: auth.requestId,
      userId: auth.userId,
      tool: toolName,
      allowed,
    });

    if (!allowed) {
      console.error(
        `[auth] DENIED: user=${auth.userId} tool=${toolName} ` +
          `has=[${auth.scopes}] needs=[${requiredScopes}]`
      );
    }

    return allowed;
  }

  getAuditLog(limit = 100) {
    return this.auditLog.slice(-limit);
  }
}

// Setup
const proxy = new MCPAuthProxy({
  jwtSecret: process.env.MCP_JWT_SECRET!,
  allowedScopes: new Map([
    ["query_database", ["db:read", "admin"]],
    ["write_database", ["db:write", "admin"]],
    ["deploy", ["deploy:prod", "admin"]],
    ["search_code", ["code:read", "admin"]],
  ]),
});

// In your MCP server's tool handler:
async function handleToolCall(
  token: string,
  toolName: string,
  args: Record<string, unknown>
) {
  const auth = proxy.authenticate(token);
  if (!auth) {
    return { error: "Authentication failed", isError: true };
  }

  if (!proxy.authorize(auth, toolName)) {
    return { error: "Insufficient permissions", isError: true };
  }

  // Proceed with actual tool execution
  return await executeToolHandler(toolName, args);
}
Enter fullscreen mode Exit fullscreen mode

Critical for production. Without auth, any agent can call any tool. That's fine in dev. In production, it's a security hole.


5. Streaming Results with Progress Reporting

Long-running MCP tools should stream progress, not block for 30 seconds and then dump results.

# streaming_mcp_tool.py
import asyncio
import json
from datetime import datetime

class StreamingToolHandler:
    """MCP tool handler that reports progress via notifications."""

    def __init__(self, server):
        self.server = server

    async def handle_long_analysis(self, arguments: dict) -> dict:
        """Analyze a large dataset with progress updates."""
        dataset_url = arguments["dataset_url"]
        analysis_type = arguments.get("type", "summary")

        steps = [
            ("Fetching dataset", self._fetch_data, dataset_url),
            ("Validating schema", self._validate, None),
            ("Running analysis", self._analyze, analysis_type),
            ("Generating report", self._report, None),
        ]

        results = {}
        for i, (description, func, arg) in enumerate(steps):
            # Send progress notification
            await self.server.send_notification(
                "notifications/progress",
                {
                    "progressToken": arguments.get("_progressToken"),
                    "progress": i,
                    "total": len(steps),
                    "message": description,
                },
            )

            start = datetime.now()
            result = await func(arg) if arg else await func()
            elapsed = (datetime.now() - start).total_seconds()

            results[description] = {
                "status": "complete",
                "elapsed_seconds": elapsed,
                "output": result,
            }

        return {
            "content": [
                {
                    "type": "text",
                    "text": json.dumps(results, indent=2, default=str),
                }
            ]
        }

    async def _fetch_data(self, url: str) -> dict:
        import httpx
        async with httpx.AsyncClient() as client:
            resp = await client.get(url)
            data = resp.json()
        return {"rows": len(data), "size_kb": len(resp.content) / 1024}

    async def _validate(self) -> dict:
        await asyncio.sleep(0.5)  # Schema validation
        return {"valid": True, "warnings": 0}

    async def _analyze(self, analysis_type: str) -> dict:
        await asyncio.sleep(2)  # Heavy computation
        return {"type": analysis_type, "insights": 42}

    async def _report(self) -> dict:
        await asyncio.sleep(0.3)
        return {"format": "markdown", "sections": 5}
Enter fullscreen mode Exit fullscreen mode

User experience matters. Even for AI agents, knowing "step 2 of 4" is better than a hanging request.


6. Error Recovery with Retry Policies

MCP tool calls fail. Network issues, rate limits, timeouts. Build retry logic into your client.

# mcp_retry.py
import asyncio
import random
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Any

class RetryStrategy(Enum):
    EXPONENTIAL = "exponential"
    LINEAR = "linear"
    IMMEDIATE = "immediate"

@dataclass
class RetryPolicy:
    max_retries: int = 3
    strategy: RetryStrategy = RetryStrategy.EXPONENTIAL
    base_delay_ms: int = 1000
    max_delay_ms: int = 30000
    jitter: bool = True
    retryable_errors: tuple = ("ConnectionError", "TimeoutError", "RateLimitError")

    def get_delay(self, attempt: int) -> float:
        if self.strategy == RetryStrategy.EXPONENTIAL:
            delay = self.base_delay_ms * (2 ** attempt)
        elif self.strategy == RetryStrategy.LINEAR:
            delay = self.base_delay_ms * (attempt + 1)
        else:
            delay = 0

        delay = min(delay, self.max_delay_ms)

        if self.jitter:
            delay = delay * (0.5 + random.random())

        return delay / 1000  # Convert to seconds


class ResilientMCPClient:
    """MCP client wrapper with automatic retry and circuit breaking."""

    def __init__(self, client, default_policy: RetryPolicy | None = None):
        self.client = client
        self.default_policy = default_policy or RetryPolicy()
        self.failure_counts: dict[str, int] = {}
        self.circuit_open: dict[str, float] = {}

    async def call_tool(
        self,
        name: str,
        arguments: dict,
        policy: RetryPolicy | None = None,
    ) -> Any:
        policy = policy or self.default_policy

        # Circuit breaker check
        if name in self.circuit_open:
            import time
            if time.time() - self.circuit_open[name] < 60:
                raise Exception(f"Circuit open for {name}. Try again later.")
            del self.circuit_open[name]
            self.failure_counts[name] = 0

        last_error = None
        for attempt in range(policy.max_retries + 1):
            try:
                result = await self.client.call_tool(name, arguments)
                self.failure_counts[name] = 0  # Reset on success
                return result
            except Exception as e:
                last_error = e
                error_type = type(e).__name__

                if error_type not in policy.retryable_errors:
                    raise  # Non-retryable, fail immediately

                self.failure_counts[name] = self.failure_counts.get(name, 0) + 1

                # Trip circuit breaker after 5 consecutive failures
                if self.failure_counts[name] >= 5:
                    import time
                    self.circuit_open[name] = time.time()
                    raise Exception(f"Circuit breaker tripped for {name}") from e

                if attempt < policy.max_retries:
                    delay = policy.get_delay(attempt)
                    print(f"[retry] {name} attempt {attempt + 1} failed: {e}. "
                          f"Retrying in {delay:.1f}s")
                    await asyncio.sleep(delay)

        raise last_error
Enter fullscreen mode Exit fullscreen mode

Production reality: Things fail. The question is whether your system recovers automatically or pages you at 3 AM.


7. Tool Result Caching

Identical tool calls within the same conversation shouldn't hit the backend twice.

# mcp_cache.py
import hashlib
import json
import time
from typing import Any

class ToolResultCache:
    """LRU cache for MCP tool results with TTL support."""

    def __init__(self, max_size: int = 1000, default_ttl: int = 300):
        self.max_size = max_size
        self.default_ttl = default_ttl
        self.cache: dict[str, dict] = {}
        self.access_order: list[str] = []
        self.tool_ttls: dict[str, int] = {}
        self.stats = {"hits": 0, "misses": 0}

    def set_ttl(self, tool_name: str, ttl_seconds: int) -> None:
        """Set custom TTL for a specific tool."""
        self.tool_ttls[tool_name] = ttl_seconds

    def _cache_key(self, tool_name: str, arguments: dict) -> str:
        arg_str = json.dumps(arguments, sort_keys=True, default=str)
        return hashlib.sha256(f"{tool_name}:{arg_str}".encode()).hexdigest()

    def get(self, tool_name: str, arguments: dict) -> Any | None:
        key = self._cache_key(tool_name, arguments)
        entry = self.cache.get(key)

        if entry is None:
            self.stats["misses"] += 1
            return None

        ttl = self.tool_ttls.get(tool_name, self.default_ttl)
        if time.time() - entry["timestamp"] > ttl:
            del self.cache[key]
            self.stats["misses"] += 1
            return None

        self.stats["hits"] += 1
        # Move to end (most recently used)
        if key in self.access_order:
            self.access_order.remove(key)
        self.access_order.append(key)

        return entry["result"]

    def put(self, tool_name: str, arguments: dict, result: Any) -> None:
        key = self._cache_key(tool_name, arguments)

        # Evict LRU if at capacity
        while len(self.cache) >= self.max_size and self.access_order:
            evict_key = self.access_order.pop(0)
            self.cache.pop(evict_key, None)

        self.cache[key] = {"result": result, "timestamp": time.time()}
        self.access_order.append(key)


class CachedMCPClient:
    """MCP client with transparent caching."""

    # Tools that should never be cached (side effects)
    NEVER_CACHE = {"write_file", "send_email", "deploy", "delete"}

    def __init__(self, client, cache: ToolResultCache | None = None):
        self.client = client
        self.cache = cache or ToolResultCache()

    async def call_tool(self, name: str, arguments: dict) -> Any:
        if name in self.NEVER_CACHE:
            return await self.client.call_tool(name, arguments)

        cached = self.cache.get(name, arguments)
        if cached is not None:
            return cached

        result = await self.client.call_tool(name, arguments)
        self.cache.put(name, arguments, result)
        return result
Enter fullscreen mode Exit fullscreen mode

Real impact: In a multi-step agent workflow, the same search or database query can fire 3-4 times. Caching saves tokens, time, and API costs.


8. Observability: Structured Logging for MCP

You can't debug what you can't see. Instrument every MCP call.

# mcp_observability.py
import json
import time
import logging
from contextvars import ContextVar
from dataclasses import dataclass, field, asdict
from typing import Any

request_id_var: ContextVar[str] = ContextVar("request_id", default="unknown")

@dataclass
class ToolCallMetric:
    tool_name: str
    arguments_hash: str
    start_time: float
    end_time: float = 0
    duration_ms: float = 0
    success: bool = True
    error: str | None = None
    result_tokens: int = 0
    cached: bool = False
    request_id: str = ""

    def finalize(self) -> "ToolCallMetric":
        self.end_time = time.time()
        self.duration_ms = (self.end_time - self.start_time) * 1000
        self.request_id = request_id_var.get()
        return self


class MCPObserver:
    """Structured observability for MCP tool calls."""

    def __init__(self):
        self.logger = logging.getLogger("mcp.observer")
        self.metrics: list[ToolCallMetric] = []

    def record(self, metric: ToolCallMetric) -> None:
        self.metrics.append(metric)
        log_data = asdict(metric)

        if metric.success:
            self.logger.info("mcp.tool.call", extra={"data": log_data})
        else:
            self.logger.error("mcp.tool.error", extra={"data": log_data})

    def get_summary(self) -> dict:
        if not self.metrics:
            return {"total_calls": 0}

        durations = [m.duration_ms for m in self.metrics]
        errors = [m for m in self.metrics if not m.success]
        cached = [m for m in self.metrics if m.cached]

        return {
            "total_calls": len(self.metrics),
            "error_count": len(errors),
            "cache_hit_rate": len(cached) / len(self.metrics),
            "avg_duration_ms": sum(durations) / len(durations),
            "p95_duration_ms": sorted(durations)[int(len(durations) * 0.95)],
            "by_tool": self._by_tool(),
        }

    def _by_tool(self) -> dict:
        tools: dict[str, list[float]] = {}
        for m in self.metrics:
            tools.setdefault(m.tool_name, []).append(m.duration_ms)
        return {
            name: {"calls": len(ds), "avg_ms": sum(ds) / len(ds)}
            for name, ds in tools.items()
        }
Enter fullscreen mode Exit fullscreen mode

Ship this on day one. When your agent makes 47 tool calls in a conversation and something goes wrong, you need to know which call, when, and why.


9. Multi-Agent Task Delegation via MCP

The real power: agents that delegate tasks to other agents through MCP.

# agent_delegation.py
import asyncio
import json
from dataclasses import dataclass
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETE = "complete"
    FAILED = "failed"

@dataclass
class AgentTask:
    task_id: str
    description: str
    assigned_to: str
    status: TaskStatus = TaskStatus.PENDING
    result: str | None = None
    dependencies: list[str] | None = None

class AgentOrchestrator:
    """Coordinates multiple AI agents via MCP tool delegation."""

    def __init__(self):
        self.agents: dict[str, dict] = {}
        self.tasks: dict[str, AgentTask] = {}

    def register_agent(self, name: str, capabilities: list[str], mcp_client) -> None:
        self.agents[name] = {
            "capabilities": capabilities,
            "client": mcp_client,
            "active_tasks": 0,
            "max_concurrent": 3,
        }

    def find_agent(self, required_capability: str) -> str | None:
        """Find the least-loaded agent with the required capability."""
        candidates = [
            (name, info)
            for name, info in self.agents.items()
            if required_capability in info["capabilities"]
            and info["active_tasks"] < info["max_concurrent"]
        ]
        if not candidates:
            return None
        return min(candidates, key=lambda x: x[1]["active_tasks"])[0]

    async def delegate(self, task: AgentTask) -> str:
        """Delegate a task to an appropriate agent."""
        self.tasks[task.task_id] = task

        # Wait for dependencies
        if task.dependencies:
            await self._wait_for_dependencies(task.dependencies)

        agent_name = task.assigned_to or self.find_agent("general")
        if not agent_name:
            task.status = TaskStatus.FAILED
            task.result = "No available agent"
            return task.result

        agent = self.agents[agent_name]
        agent["active_tasks"] += 1
        task.status = TaskStatus.RUNNING

        try:
            result = await agent["client"].call_tool(
                "execute_task",
                {
                    "task_id": task.task_id,
                    "description": task.description,
                    "context": self._get_dependency_results(task.dependencies or []),
                },
            )
            task.status = TaskStatus.COMPLETE
            task.result = result.content[0].text
        except Exception as e:
            task.status = TaskStatus.FAILED
            task.result = str(e)
        finally:
            agent["active_tasks"] -= 1

        return task.result

    async def run_parallel(self, tasks: list[AgentTask]) -> dict[str, str]:
        """Run independent tasks in parallel."""
        results = await asyncio.gather(
            *[self.delegate(t) for t in tasks],
            return_exceptions=True,
        )
        return {
            t.task_id: str(r) for t, r in zip(tasks, results)
        }

    async def _wait_for_dependencies(self, dep_ids: list[str]) -> None:
        while True:
            all_done = all(
                self.tasks.get(d, AgentTask("", "", "")).status
                in (TaskStatus.COMPLETE, TaskStatus.FAILED)
                for d in dep_ids
            )
            if all_done:
                return
            await asyncio.sleep(0.5)

    def _get_dependency_results(self, dep_ids: list[str]) -> dict:
        return {
            d: self.tasks[d].result
            for d in dep_ids
            if d in self.tasks and self.tasks[d].result
        }
Enter fullscreen mode Exit fullscreen mode

This is where MCP shines. Not as a tool protocol for one agent, but as the communication layer for agent swarms.


Putting It All Together

These 9 patterns form a production MCP stack:

Layer Pattern Purpose
Discovery Tool Registry Dynamic tool availability
Resource Budget Manager Context window protection
Topology Gateway Server composition
Security Auth Proxy Access control + audit
UX Streaming Progress feedback
Resilience Retry + Circuit Breaker Failure recovery
Performance Caching Reduce redundant calls
Ops Observability Debugging + metrics
Scale Delegation Multi-agent orchestration

If you're building agents that need to interact with real systems — databases, APIs, codebases — these patterns are the difference between a demo and a product.


Resources

Building production MCP systems means managing tools, context budgets, auth, and observability all at once. If you want pre-built utilities for common patterns like token budgeting, multi-model routing, and agent orchestration, check out the AI Dev Toolkit — it includes production-ready components for exactly these kinds of workflows.

The MCP ecosystem moves fast. What patterns are you using in production? Drop them in the comments.


This is part of the "AI Engineering in Practice" series — real patterns from real systems, not toy demos.

Top comments (0)