9 MCP Production Patterns That Actually Scale Multi-Agent Systems (2026)
Model Context Protocol went from "interesting spec" to industry standard in under a year. 97 million monthly SDK downloads. Every major AI provider on board — Anthropic, OpenAI, Google, Microsoft, Amazon.
But most tutorials still show toy examples. A weather tool. A calculator. Cool for demos, useless for production.
Here are 9 patterns we've battle-tested in real multi-agent systems — with code you can ship today.
1. The Tool Registry Pattern
Don't hardcode tools. Register them dynamically so agents discover capabilities at runtime.
// mcp-registry/src/registry.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
interface ToolDefinition {
name: string;
description: "string;"
inputSchema: Record<string, unknown>;
handler: (args: Record<string, unknown>) => Promise<unknown>;
version: string;
healthCheck?: () => Promise<boolean>;
}
class ToolRegistry {
private tools = new Map<string, ToolDefinition>();
private healthStatus = new Map<string, boolean>();
register(tool: ToolDefinition): void {
this.tools.set(tool.name, tool);
this.healthStatus.set(tool.name, true);
console.error(`[registry] Registered tool: ${tool.name} v${tool.version}`);
}
unregister(name: string): void {
this.tools.delete(name);
this.healthStatus.delete(name);
console.error(`[registry] Unregistered tool: ${name}`);
}
getHealthy(): ToolDefinition[] {
return Array.from(this.tools.values()).filter(
(t) => this.healthStatus.get(t.name) === true
);
}
async runHealthChecks(): Promise<void> {
for (const [name, tool] of this.tools) {
if (tool.healthCheck) {
try {
const healthy = await tool.healthCheck();
this.healthStatus.set(name, healthy);
} catch {
this.healthStatus.set(name, false);
console.error(`[registry] Health check failed: ${name}`);
}
}
}
}
}
const registry = new ToolRegistry();
// Example: register a database query tool
registry.register({
name: "query_database",
description: "\"Execute read-only SQL queries against the analytics database\","
version: "2.1.0",
inputSchema: {
type: "object",
properties: {
query: { type: "string", description: "\"SQL SELECT query\" },"
timeout_ms: { type: "number", description: "\"Query timeout\", default: 5000 },"
},
required: ["query"],
},
handler: async (args) => {
const query = args.query as string;
if (!query.trim().toUpperCase().startsWith("SELECT")) {
throw new Error("Only SELECT queries allowed");
}
// Execute against your DB pool
return await executeQuery(query, args.timeout_ms as number);
},
healthCheck: async () => {
try {
await executeQuery("SELECT 1", 1000);
return true;
} catch {
return false;
}
},
});
// Wire up the MCP server
const server = new Server({ name: "tool-registry", version: "1.0.0" }, {
capabilities: { tools: {} },
});
server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: registry.getHealthy().map((t) => ({
name: t.name,
description: "t.description,"
inputSchema: t.inputSchema,
})),
}));
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const tool = registry.getHealthy().find((t) => t.name === request.params.name);
if (!tool) {
return { content: [{ type: "text", text: `Tool not found or unhealthy: ${request.params.name}` }], isError: true };
}
try {
const result = await tool.handler(request.params.arguments ?? {});
return { content: [{ type: "text", text: JSON.stringify(result, null, 2) }] };
} catch (err) {
return { content: [{ type: "text", text: `Error: ${(err as Error).message}` }], isError: true };
}
});
// Health check loop
setInterval(() => registry.runHealthChecks(), 30_000);
const transport = new StdioServerTransport();
await server.connect(transport);
Why it matters: In production you have dozens of tools. Some go down. The registry pattern means agents only see what's actually working.
2. Context Window Budget Manager
MCP servers can return massive payloads. Without budget management, you blow your context window on one tool call.
# budget_manager.py
import tiktoken
from dataclasses import dataclass
from typing import Any
@dataclass
class ContextBudget:
total_tokens: int
reserved_for_response: int = 4096
reserved_for_system: int = 2000
used_tokens: int = 0
@property
def available(self) -> int:
return self.total_tokens - self.reserved_for_response - self.reserved_for_system - self.used_tokens
def consume(self, tokens: int) -> None:
self.used_tokens += tokens
def can_afford(self, tokens: int) -> bool:
return tokens <= self.available
class MCPBudgetProxy:
"""Wraps MCP tool results to enforce context budgets."""
def __init__(self, budget: ContextBudget, model: str = "gpt-4"):
self.budget = budget
self.encoder = tiktoken.encoding_for_model(model)
def count_tokens(self, text: str) -> int:
return len(self.encoder.encode(text))
def truncate_to_budget(self, text: str, max_fraction: float = 0.3) -> str:
"""Truncate result to fit within budget fraction."""
max_tokens = int(self.budget.available * max_fraction)
tokens = self.encoder.encode(text)
if len(tokens) <= max_tokens:
self.budget.consume(len(tokens))
return text
# Truncate and add indicator
truncated = self.encoder.decode(tokens[:max_tokens - 20])
suffix = f"\n\n[TRUNCATED: {len(tokens)} tokens → {max_tokens} tokens. Request specific sections for full data.]"
result = truncated + suffix
self.budget.consume(max_tokens)
return result
async def call_tool_with_budget(
self,
mcp_client,
tool_name: str,
arguments: dict[str, Any],
max_fraction: float = 0.3,
) -> str:
"""Call MCP tool and enforce budget on response."""
if self.budget.available < 500:
return "[BUDGET EXHAUSTED: Cannot make more tool calls. Respond with available context.]"
raw_result = await mcp_client.call_tool(tool_name, arguments)
text = raw_result.content[0].text if raw_result.content else ""
return self.truncate_to_budget(text, max_fraction)
# Usage in an agent loop
budget = ContextBudget(total_tokens=128_000)
proxy = MCPBudgetProxy(budget)
async def agent_step(mcp_client, tool_name: str, args: dict) -> str:
result = await proxy.call_tool_with_budget(mcp_client, tool_name, args)
print(f"Budget remaining: {budget.available} tokens")
return result
The lesson: Production agents need resource management. Treat context tokens like memory — allocate, track, and refuse when depleted.
3. MCP Server Composition (The Gateway Pattern)
One agent shouldn't connect to 15 MCP servers. Build a gateway that composes multiple servers behind a single interface.
# mcp_gateway.py
import asyncio
import json
from dataclasses import dataclass, field
@dataclass
class MCPServerConfig:
name: str
command: str
args: list[str] = field(default_factory=list)
env: dict[str, str] = field(default_factory=dict)
priority: int = 0 # Higher = preferred when tools overlap
@dataclass
class GatewayTool:
name: str
server: str
original_name: str
description: str
input_schema: dict
class MCPGateway:
"""Composes multiple MCP servers into a single tool namespace."""
def __init__(self, configs: list[MCPServerConfig]):
self.configs = {c.name: c for c in configs}
self.connections: dict[str, Any] = {}
self.tool_map: dict[str, GatewayTool] = {}
async def connect_all(self) -> None:
for name, config in self.configs.items():
try:
conn = await self._connect_server(config)
self.connections[name] = conn
tools = await conn.list_tools()
for tool in tools:
namespaced = f"{name}__{tool.name}"
self.tool_map[namespaced] = GatewayTool(
name=namespaced,
server=name,
original_name=tool.name,
description=f"[{name}] {tool.description}",
input_schema=tool.inputSchema,
)
print(f"[gateway] Connected: {name} ({len(tools)} tools)")
except Exception as e:
print(f"[gateway] Failed to connect {name}: {e}")
async def call_tool(self, namespaced_name: str, arguments: dict) -> Any:
gateway_tool = self.tool_map.get(namespaced_name)
if not gateway_tool:
raise ValueError(f"Unknown tool: {namespaced_name}")
conn = self.connections.get(gateway_tool.server)
if not conn:
raise ConnectionError(f"Server disconnected: {gateway_tool.server}")
return await conn.call_tool(gateway_tool.original_name, arguments)
def list_tools(self) -> list[dict]:
return [
{
"name": t.name,
"description": t.description,
"inputSchema": t.input_schema,
}
for t in self.tool_map.values()
]
async def _connect_server(self, config: MCPServerConfig):
# Uses MCP SDK client to connect via stdio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
params = StdioServerParameters(
command=config.command,
args=config.args,
env=config.env,
)
transport = await stdio_client(params).__aenter__()
session = ClientSession(*transport)
await session.initialize()
return session
# Configuration
gateway = MCPGateway([
MCPServerConfig(name="db", command="node", args=["./mcp-db-server/dist/index.js"]),
MCPServerConfig(name="github", command="npx", args=["-y", "@modelcontextprotocol/server-github"]),
MCPServerConfig(name="search", command="python", args=["./mcp-search-server.py"]),
])
Why: A gateway gives you one connection, namespaced tools, and centralized error handling. Your agent doesn't need to know the topology.
4. Authentication Proxy for MCP
MCP has no built-in auth. Wrap your servers with an auth layer before exposing them.
// mcp-auth-proxy.ts
import { createServer } from "net";
import { randomUUID } from "crypto";
import jwt from "jsonwebtoken";
interface AuthConfig {
jwtSecret: string;
allowedScopes: Map<string, string[]>; // tool_name → required scopes
}
interface AuthenticatedRequest {
userId: string;
scopes: string[];
requestId: string;
}
class MCPAuthProxy {
private config: AuthConfig;
private auditLog: Array<{
timestamp: string;
requestId: string;
userId: string;
tool: string;
allowed: boolean;
}> = [];
constructor(config: AuthConfig) {
this.config = config;
}
authenticate(token: string): AuthenticatedRequest | null {
try {
const decoded = jwt.verify(token, this.config.jwtSecret) as {
sub: string;
scopes: string[];
};
return {
userId: decoded.sub,
scopes: decoded.scopes,
requestId: randomUUID(),
};
} catch {
return null;
}
}
authorize(auth: AuthenticatedRequest, toolName: string): boolean {
const requiredScopes = this.config.allowedScopes.get(toolName) ?? ["admin"];
const allowed = requiredScopes.some((s) => auth.scopes.includes(s));
this.auditLog.push({
timestamp: new Date().toISOString(),
requestId: auth.requestId,
userId: auth.userId,
tool: toolName,
allowed,
});
if (!allowed) {
console.error(
`[auth] DENIED: user=${auth.userId} tool=${toolName} ` +
`has=[${auth.scopes}] needs=[${requiredScopes}]`
);
}
return allowed;
}
getAuditLog(limit = 100) {
return this.auditLog.slice(-limit);
}
}
// Setup
const proxy = new MCPAuthProxy({
jwtSecret: process.env.MCP_JWT_SECRET!,
allowedScopes: new Map([
["query_database", ["db:read", "admin"]],
["write_database", ["db:write", "admin"]],
["deploy", ["deploy:prod", "admin"]],
["search_code", ["code:read", "admin"]],
]),
});
// In your MCP server's tool handler:
async function handleToolCall(
token: string,
toolName: string,
args: Record<string, unknown>
) {
const auth = proxy.authenticate(token);
if (!auth) {
return { error: "Authentication failed", isError: true };
}
if (!proxy.authorize(auth, toolName)) {
return { error: "Insufficient permissions", isError: true };
}
// Proceed with actual tool execution
return await executeToolHandler(toolName, args);
}
Critical for production. Without auth, any agent can call any tool. That's fine in dev. In production, it's a security hole.
5. Streaming Results with Progress Reporting
Long-running MCP tools should stream progress, not block for 30 seconds and then dump results.
# streaming_mcp_tool.py
import asyncio
import json
from datetime import datetime
class StreamingToolHandler:
"""MCP tool handler that reports progress via notifications."""
def __init__(self, server):
self.server = server
async def handle_long_analysis(self, arguments: dict) -> dict:
"""Analyze a large dataset with progress updates."""
dataset_url = arguments["dataset_url"]
analysis_type = arguments.get("type", "summary")
steps = [
("Fetching dataset", self._fetch_data, dataset_url),
("Validating schema", self._validate, None),
("Running analysis", self._analyze, analysis_type),
("Generating report", self._report, None),
]
results = {}
for i, (description, func, arg) in enumerate(steps):
# Send progress notification
await self.server.send_notification(
"notifications/progress",
{
"progressToken": arguments.get("_progressToken"),
"progress": i,
"total": len(steps),
"message": description,
},
)
start = datetime.now()
result = await func(arg) if arg else await func()
elapsed = (datetime.now() - start).total_seconds()
results[description] = {
"status": "complete",
"elapsed_seconds": elapsed,
"output": result,
}
return {
"content": [
{
"type": "text",
"text": json.dumps(results, indent=2, default=str),
}
]
}
async def _fetch_data(self, url: str) -> dict:
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get(url)
data = resp.json()
return {"rows": len(data), "size_kb": len(resp.content) / 1024}
async def _validate(self) -> dict:
await asyncio.sleep(0.5) # Schema validation
return {"valid": True, "warnings": 0}
async def _analyze(self, analysis_type: str) -> dict:
await asyncio.sleep(2) # Heavy computation
return {"type": analysis_type, "insights": 42}
async def _report(self) -> dict:
await asyncio.sleep(0.3)
return {"format": "markdown", "sections": 5}
User experience matters. Even for AI agents, knowing "step 2 of 4" is better than a hanging request.
6. Error Recovery with Retry Policies
MCP tool calls fail. Network issues, rate limits, timeouts. Build retry logic into your client.
# mcp_retry.py
import asyncio
import random
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Any
class RetryStrategy(Enum):
EXPONENTIAL = "exponential"
LINEAR = "linear"
IMMEDIATE = "immediate"
@dataclass
class RetryPolicy:
max_retries: int = 3
strategy: RetryStrategy = RetryStrategy.EXPONENTIAL
base_delay_ms: int = 1000
max_delay_ms: int = 30000
jitter: bool = True
retryable_errors: tuple = ("ConnectionError", "TimeoutError", "RateLimitError")
def get_delay(self, attempt: int) -> float:
if self.strategy == RetryStrategy.EXPONENTIAL:
delay = self.base_delay_ms * (2 ** attempt)
elif self.strategy == RetryStrategy.LINEAR:
delay = self.base_delay_ms * (attempt + 1)
else:
delay = 0
delay = min(delay, self.max_delay_ms)
if self.jitter:
delay = delay * (0.5 + random.random())
return delay / 1000 # Convert to seconds
class ResilientMCPClient:
"""MCP client wrapper with automatic retry and circuit breaking."""
def __init__(self, client, default_policy: RetryPolicy | None = None):
self.client = client
self.default_policy = default_policy or RetryPolicy()
self.failure_counts: dict[str, int] = {}
self.circuit_open: dict[str, float] = {}
async def call_tool(
self,
name: str,
arguments: dict,
policy: RetryPolicy | None = None,
) -> Any:
policy = policy or self.default_policy
# Circuit breaker check
if name in self.circuit_open:
import time
if time.time() - self.circuit_open[name] < 60:
raise Exception(f"Circuit open for {name}. Try again later.")
del self.circuit_open[name]
self.failure_counts[name] = 0
last_error = None
for attempt in range(policy.max_retries + 1):
try:
result = await self.client.call_tool(name, arguments)
self.failure_counts[name] = 0 # Reset on success
return result
except Exception as e:
last_error = e
error_type = type(e).__name__
if error_type not in policy.retryable_errors:
raise # Non-retryable, fail immediately
self.failure_counts[name] = self.failure_counts.get(name, 0) + 1
# Trip circuit breaker after 5 consecutive failures
if self.failure_counts[name] >= 5:
import time
self.circuit_open[name] = time.time()
raise Exception(f"Circuit breaker tripped for {name}") from e
if attempt < policy.max_retries:
delay = policy.get_delay(attempt)
print(f"[retry] {name} attempt {attempt + 1} failed: {e}. "
f"Retrying in {delay:.1f}s")
await asyncio.sleep(delay)
raise last_error
Production reality: Things fail. The question is whether your system recovers automatically or pages you at 3 AM.
7. Tool Result Caching
Identical tool calls within the same conversation shouldn't hit the backend twice.
# mcp_cache.py
import hashlib
import json
import time
from typing import Any
class ToolResultCache:
"""LRU cache for MCP tool results with TTL support."""
def __init__(self, max_size: int = 1000, default_ttl: int = 300):
self.max_size = max_size
self.default_ttl = default_ttl
self.cache: dict[str, dict] = {}
self.access_order: list[str] = []
self.tool_ttls: dict[str, int] = {}
self.stats = {"hits": 0, "misses": 0}
def set_ttl(self, tool_name: str, ttl_seconds: int) -> None:
"""Set custom TTL for a specific tool."""
self.tool_ttls[tool_name] = ttl_seconds
def _cache_key(self, tool_name: str, arguments: dict) -> str:
arg_str = json.dumps(arguments, sort_keys=True, default=str)
return hashlib.sha256(f"{tool_name}:{arg_str}".encode()).hexdigest()
def get(self, tool_name: str, arguments: dict) -> Any | None:
key = self._cache_key(tool_name, arguments)
entry = self.cache.get(key)
if entry is None:
self.stats["misses"] += 1
return None
ttl = self.tool_ttls.get(tool_name, self.default_ttl)
if time.time() - entry["timestamp"] > ttl:
del self.cache[key]
self.stats["misses"] += 1
return None
self.stats["hits"] += 1
# Move to end (most recently used)
if key in self.access_order:
self.access_order.remove(key)
self.access_order.append(key)
return entry["result"]
def put(self, tool_name: str, arguments: dict, result: Any) -> None:
key = self._cache_key(tool_name, arguments)
# Evict LRU if at capacity
while len(self.cache) >= self.max_size and self.access_order:
evict_key = self.access_order.pop(0)
self.cache.pop(evict_key, None)
self.cache[key] = {"result": result, "timestamp": time.time()}
self.access_order.append(key)
class CachedMCPClient:
"""MCP client with transparent caching."""
# Tools that should never be cached (side effects)
NEVER_CACHE = {"write_file", "send_email", "deploy", "delete"}
def __init__(self, client, cache: ToolResultCache | None = None):
self.client = client
self.cache = cache or ToolResultCache()
async def call_tool(self, name: str, arguments: dict) -> Any:
if name in self.NEVER_CACHE:
return await self.client.call_tool(name, arguments)
cached = self.cache.get(name, arguments)
if cached is not None:
return cached
result = await self.client.call_tool(name, arguments)
self.cache.put(name, arguments, result)
return result
Real impact: In a multi-step agent workflow, the same search or database query can fire 3-4 times. Caching saves tokens, time, and API costs.
8. Observability: Structured Logging for MCP
You can't debug what you can't see. Instrument every MCP call.
# mcp_observability.py
import json
import time
import logging
from contextvars import ContextVar
from dataclasses import dataclass, field, asdict
from typing import Any
request_id_var: ContextVar[str] = ContextVar("request_id", default="unknown")
@dataclass
class ToolCallMetric:
tool_name: str
arguments_hash: str
start_time: float
end_time: float = 0
duration_ms: float = 0
success: bool = True
error: str | None = None
result_tokens: int = 0
cached: bool = False
request_id: str = ""
def finalize(self) -> "ToolCallMetric":
self.end_time = time.time()
self.duration_ms = (self.end_time - self.start_time) * 1000
self.request_id = request_id_var.get()
return self
class MCPObserver:
"""Structured observability for MCP tool calls."""
def __init__(self):
self.logger = logging.getLogger("mcp.observer")
self.metrics: list[ToolCallMetric] = []
def record(self, metric: ToolCallMetric) -> None:
self.metrics.append(metric)
log_data = asdict(metric)
if metric.success:
self.logger.info("mcp.tool.call", extra={"data": log_data})
else:
self.logger.error("mcp.tool.error", extra={"data": log_data})
def get_summary(self) -> dict:
if not self.metrics:
return {"total_calls": 0}
durations = [m.duration_ms for m in self.metrics]
errors = [m for m in self.metrics if not m.success]
cached = [m for m in self.metrics if m.cached]
return {
"total_calls": len(self.metrics),
"error_count": len(errors),
"cache_hit_rate": len(cached) / len(self.metrics),
"avg_duration_ms": sum(durations) / len(durations),
"p95_duration_ms": sorted(durations)[int(len(durations) * 0.95)],
"by_tool": self._by_tool(),
}
def _by_tool(self) -> dict:
tools: dict[str, list[float]] = {}
for m in self.metrics:
tools.setdefault(m.tool_name, []).append(m.duration_ms)
return {
name: {"calls": len(ds), "avg_ms": sum(ds) / len(ds)}
for name, ds in tools.items()
}
Ship this on day one. When your agent makes 47 tool calls in a conversation and something goes wrong, you need to know which call, when, and why.
9. Multi-Agent Task Delegation via MCP
The real power: agents that delegate tasks to other agents through MCP.
# agent_delegation.py
import asyncio
import json
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETE = "complete"
FAILED = "failed"
@dataclass
class AgentTask:
task_id: str
description: str
assigned_to: str
status: TaskStatus = TaskStatus.PENDING
result: str | None = None
dependencies: list[str] | None = None
class AgentOrchestrator:
"""Coordinates multiple AI agents via MCP tool delegation."""
def __init__(self):
self.agents: dict[str, dict] = {}
self.tasks: dict[str, AgentTask] = {}
def register_agent(self, name: str, capabilities: list[str], mcp_client) -> None:
self.agents[name] = {
"capabilities": capabilities,
"client": mcp_client,
"active_tasks": 0,
"max_concurrent": 3,
}
def find_agent(self, required_capability: str) -> str | None:
"""Find the least-loaded agent with the required capability."""
candidates = [
(name, info)
for name, info in self.agents.items()
if required_capability in info["capabilities"]
and info["active_tasks"] < info["max_concurrent"]
]
if not candidates:
return None
return min(candidates, key=lambda x: x[1]["active_tasks"])[0]
async def delegate(self, task: AgentTask) -> str:
"""Delegate a task to an appropriate agent."""
self.tasks[task.task_id] = task
# Wait for dependencies
if task.dependencies:
await self._wait_for_dependencies(task.dependencies)
agent_name = task.assigned_to or self.find_agent("general")
if not agent_name:
task.status = TaskStatus.FAILED
task.result = "No available agent"
return task.result
agent = self.agents[agent_name]
agent["active_tasks"] += 1
task.status = TaskStatus.RUNNING
try:
result = await agent["client"].call_tool(
"execute_task",
{
"task_id": task.task_id,
"description": task.description,
"context": self._get_dependency_results(task.dependencies or []),
},
)
task.status = TaskStatus.COMPLETE
task.result = result.content[0].text
except Exception as e:
task.status = TaskStatus.FAILED
task.result = str(e)
finally:
agent["active_tasks"] -= 1
return task.result
async def run_parallel(self, tasks: list[AgentTask]) -> dict[str, str]:
"""Run independent tasks in parallel."""
results = await asyncio.gather(
*[self.delegate(t) for t in tasks],
return_exceptions=True,
)
return {
t.task_id: str(r) for t, r in zip(tasks, results)
}
async def _wait_for_dependencies(self, dep_ids: list[str]) -> None:
while True:
all_done = all(
self.tasks.get(d, AgentTask("", "", "")).status
in (TaskStatus.COMPLETE, TaskStatus.FAILED)
for d in dep_ids
)
if all_done:
return
await asyncio.sleep(0.5)
def _get_dependency_results(self, dep_ids: list[str]) -> dict:
return {
d: self.tasks[d].result
for d in dep_ids
if d in self.tasks and self.tasks[d].result
}
This is where MCP shines. Not as a tool protocol for one agent, but as the communication layer for agent swarms.
Putting It All Together
These 9 patterns form a production MCP stack:
| Layer | Pattern | Purpose |
|---|---|---|
| Discovery | Tool Registry | Dynamic tool availability |
| Resource | Budget Manager | Context window protection |
| Topology | Gateway | Server composition |
| Security | Auth Proxy | Access control + audit |
| UX | Streaming | Progress feedback |
| Resilience | Retry + Circuit Breaker | Failure recovery |
| Performance | Caching | Reduce redundant calls |
| Ops | Observability | Debugging + metrics |
| Scale | Delegation | Multi-agent orchestration |
If you're building agents that need to interact with real systems — databases, APIs, codebases — these patterns are the difference between a demo and a product.
Resources
Building production MCP systems means managing tools, context budgets, auth, and observability all at once. If you want pre-built utilities for common patterns like token budgeting, multi-model routing, and agent orchestration, check out the AI Dev Toolkit — it includes production-ready components for exactly these kinds of workflows.
The MCP ecosystem moves fast. What patterns are you using in production? Drop them in the comments.
This is part of the "AI Engineering in Practice" series — real patterns from real systems, not toy demos.
Top comments (0)