Part 3 of 3 in the MCP Server Series
In Part 1, we explored MCP concepts. In Part 2, we built a working server. Now it's time to make it production-ready.
Moving from a prototype to production means addressing security, performance, reliability, and maintainability. This post covers everything you need to deploy MCP servers safely at scale.
Series Navigation:
- Part 1: Understanding MCP conceptually
- Part 2: Building your first MCP server
- Part 3 (This post): Security, performance, and production deployment
Security: The Most Critical Consideration
MCP servers can read your data, execute commands, and modify systems. Security isn't optional—it's existential.
The Core Security Model
Remember: MCP servers run locally by default. They communicate via stdio (standard input/output), not network ports. This provides baseline security:
✅ Process isolation - Only the parent application can communicate with the server
✅ No network exposure - Unlike REST APIs, stdio servers don't open ports
✅ Explicit configuration - Users must manually add servers to their config
But this isn't enough for production.
1. Authentication & Authorization
Even though MCP servers run locally, you should still validate requests:
import os
from functools import wraps
# Load from environment variable, not hardcoded
VALID_API_KEY = os.getenv("MCP_API_KEY")
def require_auth(func):
"""Decorator to enforce authentication on tool calls"""
@wraps(func)
async def wrapper(name: str, arguments: dict):
auth_token = arguments.get("auth_token")
if not auth_token or auth_token != VALID_API_KEY:
return [TextContent(
type="text",
text="Authentication failed. Invalid or missing auth_token."
)]
# Remove auth_token from arguments before processing
clean_args = {k: v for k, v in arguments.items() if k != "auth_token"}
return await func(name, clean_args)
return wrapper
@app.call_tool()
@require_auth
async def call_tool(name: str, arguments: dict):
# Your tool implementation
pass
Update your tool schemas to require auth:
Tool(
name="get_customer",
description="Retrieve customer information by ID",
inputSchema={
"type": "object",
"properties": {
"auth_token": {
"type": "string",
"description": "Authentication token (required)"
},
"customer_id": {
"type": "string",
"description": "Customer's unique ID"
}
},
"required": ["auth_token", "customer_id"]
}
)
2. Principle of Least Privilege
Never expose more than absolutely necessary:
# BAD: Exposing raw SQL execution
Tool(
name="execute_query",
description="Run any SQL query",
inputSchema={"type": "object", "properties": {"sql": {"type": "string"}}}
)
# GOOD: Specific, limited operations
Tool(
name="get_customer",
description="Retrieve customer by ID (read-only)",
inputSchema={"type": "object", "properties": {"customer_id": {"type": "string"}}}
)
Tool(
name="update_customer_email",
description="Update customer email only (requires admin)",
inputSchema={
"type": "object",
"properties": {
"customer_id": {"type": "string"},
"new_email": {"type": "string"}
}
}
)
3. Input Validation & Sanitization
Always validate and sanitize inputs:
import re
from typing import Optional
def validate_customer_id(customer_id: str) -> Optional[str]:
"""Validate customer ID format"""
# Only allow alphanumeric characters and hyphens
if not re.match(r'^[a-zA-Z0-9-]+$', customer_id):
return "Invalid customer ID format"
# Check length
if len(customer_id) > 50:
return "Customer ID too long"
return None
def validate_email(email: str) -> Optional[str]:
"""Basic email validation"""
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_regex, email):
return "Invalid email format"
return None
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "get_customer":
customer_id = arguments.get("customer_id")
# Validate input
error = validate_customer_id(customer_id)
if error:
return [TextContent(type="text", text=f"Error: {error}")]
# Safe to proceed
customer = await get_customer_from_db(customer_id)
# ...
4. SQL Injection Prevention
NEVER construct SQL queries with string concatenation:
# DANGEROUS - SQL Injection vulnerability
async def get_customer_bad(customer_id: str):
query = f"SELECT * FROM customers WHERE id = '{customer_id}'"
# If customer_id = "1' OR '1'='1", you've exposed everything!
result = await conn.fetchrow(query)
# SAFE - Parameterized queries
async def get_customer_safe(customer_id: str):
query = "SELECT * FROM customers WHERE id = $1"
result = await conn.fetchrow(query, customer_id)
return result
5. Secrets Management
Never hardcode secrets:
# BAD
DATABASE_URL = "postgresql://user:password123@localhost/db"
API_KEY = "sk-1234567890"
# GOOD - Use environment variables
import os
DATABASE_URL = os.getenv("DATABASE_URL")
API_KEY = os.getenv("API_KEY")
if not DATABASE_URL or not API_KEY:
raise ValueError("Missing required environment variables")
For production, use proper secrets management:
- AWS Secrets Manager
- HashiCorp Vault
- Azure Key Vault
- Google Secret Manager
6. Rate Limiting
Prevent abuse with rate limiting:
from collections import defaultdict
from datetime import datetime, timedelta
import asyncio
class RateLimiter:
def __init__(self, max_requests: int, time_window: int):
self.max_requests = max_requests
self.time_window = time_window # seconds
self.requests = defaultdict(list)
def is_allowed(self, user_id: str) -> bool:
now = datetime.now()
cutoff = now - timedelta(seconds=self.time_window)
# Remove old requests
self.requests[user_id] = [
req_time for req_time in self.requests[user_id]
if req_time > cutoff
]
# Check if under limit
if len(self.requests[user_id]) >= self.max_requests:
return False
# Record this request
self.requests[user_id].append(now)
return True
# Create rate limiter: 100 requests per hour
rate_limiter = RateLimiter(max_requests=100, time_window=3600)
@app.call_tool()
async def call_tool(name: str, arguments: dict):
user_id = arguments.get("user_id", "default")
if not rate_limiter.is_allowed(user_id):
return [TextContent(
type="text",
text="Rate limit exceeded. Please try again later."
)]
# Process request
# ...
7. Audit Logging
Log every operation for security monitoring:
import logging
import json
from datetime import datetime
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@app.call_tool()
async def call_tool(name: str, arguments: dict):
# Log the request
logger.info(
"MCP tool called",
extra={
"tool_name": name,
"arguments": json.dumps(arguments),
"timestamp": datetime.utcnow().isoformat(),
"user_id": arguments.get("user_id", "unknown")
}
)
try:
result = await execute_tool(name, arguments)
# Log success
logger.info(
"MCP tool completed successfully",
extra={"tool_name": name, "user_id": arguments.get("user_id")}
)
return result
except Exception as e:
# Log failure
logger.error(
"MCP tool failed",
extra={
"tool_name": name,
"error": str(e),
"user_id": arguments.get("user_id")
}
)
raise
Performance Optimization
Performance matters, especially when serving multiple AI agents or handling high request volumes.
1. Connection Pooling
Don't create new database connections for every request:
import asyncpg
from contextlib import asynccontextmanager
class DatabasePool:
def __init__(self, database_url: str):
self.database_url = database_url
self.pool = None
async def initialize(self):
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=5,
max_size=20,
command_timeout=60
)
async def close(self):
if self.pool:
await self.pool.close()
@asynccontextmanager
async def acquire(self):
async with self.pool.acquire() as connection:
yield connection
# Global pool instance
db_pool = DatabasePool(DATABASE_URL)
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "get_customer":
async with db_pool.acquire() as conn:
result = await conn.fetchrow(
"SELECT * FROM customers WHERE id = $1",
arguments["customer_id"]
)
return [TextContent(type="text", text=json.dumps(dict(result)))]
2. Caching
Cache frequently accessed data:
from functools import lru_cache
import asyncio
from datetime import datetime, timedelta
class AsyncCache:
def __init__(self, ttl_seconds: int = 300):
self.cache = {}
self.ttl = ttl_seconds
def get(self, key: str):
if key in self.cache:
data, timestamp = self.cache[key]
if datetime.now() - timestamp < timedelta(seconds=self.ttl):
return data
else:
del self.cache[key]
return None
def set(self, key: str, value):
self.cache[key] = (value, datetime.now())
cache = AsyncCache(ttl_seconds=300) # 5 minute cache
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "get_customer":
customer_id = arguments["customer_id"]
cache_key = f"customer:{customer_id}"
# Check cache
cached_data = cache.get(cache_key)
if cached_data:
logger.info(f"Cache hit for {cache_key}")
return [TextContent(type="text", text=cached_data)]
# Fetch from database
async with db_pool.acquire() as conn:
result = await conn.fetchrow(
"SELECT * FROM customers WHERE id = $1",
customer_id
)
data = json.dumps(dict(result))
# Store in cache
cache.set(cache_key, data)
return [TextContent(type="text", text=data)]
3. Async Operations
Use async for I/O operations to handle concurrent requests efficiently:
import httpx
import asyncio
async def fetch_multiple_apis(customer_id: str):
"""Fetch data from multiple sources concurrently"""
async with httpx.AsyncClient() as client:
# Run these concurrently instead of sequentially
results = await asyncio.gather(
client.get(f"https://api1.com/customer/{customer_id}"),
client.get(f"https://api2.com/orders/{customer_id}"),
client.get(f"https://api3.com/preferences/{customer_id}"),
return_exceptions=True # Don't fail if one API is down
)
# Process results
customer_data = {}
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.warning(f"API {i+1} failed: {result}")
else:
customer_data[f"source_{i+1}"] = result.json()
return customer_data
4. Timeouts
Always set timeouts to prevent hanging operations:
import asyncio
@app.call_tool()
async def call_tool(name: str, arguments: dict):
try:
# Set 30 second timeout
result = await asyncio.wait_for(
execute_tool_logic(name, arguments),
timeout=30.0
)
return result
except asyncio.TimeoutError:
logger.error(f"Tool {name} timed out after 30 seconds")
return [TextContent(
type="text",
text="Operation timed out. Please try again or contact support."
)]
Error Handling & Resilience
Production systems fail. Handle it gracefully.
1. Comprehensive Error Handling
from typing import List
import traceback
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> List[TextContent]:
try:
# Validate tool exists
if name not in AVAILABLE_TOOLS:
return [TextContent(
type="text",
text=f"Unknown tool: {name}. Available tools: {', '.join(AVAILABLE_TOOLS)}"
)]
# Execute tool
result = await execute_tool_logic(name, arguments)
return result
except ValueError as e:
# User input errors
logger.warning(f"Validation error in {name}: {e}")
return [TextContent(type="text", text=f"Invalid input: {str(e)}")]
except PermissionError as e:
# Authorization errors
logger.warning(f"Permission denied for {name}: {e}")
return [TextContent(type="text", text="Permission denied. Please check your credentials.")]
except asyncpg.PostgresError as e:
# Database errors
logger.error(f"Database error in {name}: {e}")
return [TextContent(type="text", text="Database error. Please try again later.")]
except httpx.HTTPError as e:
# External API errors
logger.error(f"External API error in {name}: {e}")
return [TextContent(type="text", text="External service unavailable. Please try again later.")]
except Exception as e:
# Catch-all for unexpected errors
logger.error(f"Unexpected error in {name}: {e}\n{traceback.format_exc()}")
return [TextContent(
type="text",
text="An unexpected error occurred. The issue has been logged and will be investigated."
)]
2. Retry Logic
Implement retries for transient failures:
import asyncio
from typing import TypeVar, Callable
T = TypeVar('T')
async def retry_with_backoff(
func: Callable,
max_attempts: int = 3,
initial_delay: float = 1.0,
backoff_factor: float = 2.0
) -> T:
"""Retry function with exponential backoff"""
delay = initial_delay
last_exception = None
for attempt in range(max_attempts):
try:
return await func()
except Exception as e:
last_exception = e
logger.warning(
f"Attempt {attempt + 1}/{max_attempts} failed: {e}"
)
if attempt < max_attempts - 1:
await asyncio.sleep(delay)
delay *= backoff_factor
# All retries failed
raise last_exception
# Usage
async def fetch_customer_with_retry(customer_id: str):
return await retry_with_backoff(
lambda: fetch_customer_from_api(customer_id),
max_attempts=3
)
3. Circuit Breaker
Prevent cascading failures:
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
timeout_seconds: int = 60,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.timeout = timedelta(seconds=timeout_seconds)
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func):
if self.state == CircuitState.OPEN:
if datetime.now() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
logger.info("Circuit breaker entering HALF_OPEN state")
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func()
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
logger.error("Circuit breaker opened due to repeated failures")
# Usage
external_api_breaker = CircuitBreaker(failure_threshold=5, timeout_seconds=60)
async def call_external_api(customer_id: str):
return await external_api_breaker.call(
lambda: fetch_from_external_api(customer_id)
)
Deployment Strategies
1. Environment Configuration
Use different configs for dev/staging/prod:
import os
from enum import Enum
class Environment(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
class Config:
def __init__(self):
self.env = Environment(os.getenv("ENVIRONMENT", "development"))
self.database_url = os.getenv("DATABASE_URL")
self.log_level = os.getenv("LOG_LEVEL", "INFO")
self.enable_caching = os.getenv("ENABLE_CACHING", "true").lower() == "true"
self.rate_limit = int(os.getenv("RATE_LIMIT", "100"))
@property
def is_production(self):
return self.env == Environment.PRODUCTION
config = Config()
# Use config throughout your code
if config.enable_caching:
# Use cache
pass
2. Docker Deployment
Create a Dockerfile:
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy server code
COPY customer_server.py .
# Non-root user for security
RUN useradd -m mcpuser
USER mcpuser
# Run the server
CMD ["python", "customer_server.py"]
docker-compose.yml:
version: '3.8'
services:
mcp-server:
build: .
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/customers
- ENVIRONMENT=production
- LOG_LEVEL=INFO
depends_on:
- db
restart: unless-stopped
db:
image: postgres:15
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=customers
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
3. Monitoring & Observability
Add health checks and metrics:
from prometheus_client import Counter, Histogram, start_http_server
import time
# Metrics
tool_calls_total = Counter('mcp_tool_calls_total', 'Total tool calls', ['tool_name', 'status'])
tool_duration = Histogram('mcp_tool_duration_seconds', 'Tool execution time', ['tool_name'])
@app.call_tool()
async def call_tool(name: str, arguments: dict):
start_time = time.time()
try:
result = await execute_tool_logic(name, arguments)
tool_calls_total.labels(tool_name=name, status='success').inc()
return result
except Exception as e:
tool_calls_total.labels(tool_name=name, status='error').inc()
raise
finally:
duration = time.time() - start_time
tool_duration.labels(tool_name=name).observe(duration)
# Start Prometheus metrics server (on a different port)
start_http_server(8000)
Real-World Considerations
Scaling Horizontally
For high-traffic scenarios:
- Deploy multiple MCP server instances
- Use a load balancer (though most MCP servers are stdio-based and tied to a single client)
- Consider transitioning to HTTP-based MCP servers for true horizontal scaling
Data Privacy
- PII Handling: Minimize exposure of personally identifiable information
- Data Retention: Implement policies for log data
- GDPR/CCPA Compliance: Ensure audit logs can be purged on request
- Encryption: Use TLS for remote MCP servers
Maintenance
- Versioning: Use semantic versioning for your MCP servers
- Deprecation Strategy: Give users notice before removing tools
- Backward Compatibility: Don't break existing tool contracts
- Documentation: Maintain up-to-date API documentation
Checklist: Production Readiness
Before deploying to production, verify:
- [ ] Authentication implemented
- [ ] Input validation on all tools
- [ ] SQL injection prevention (parameterized queries)
- [ ] Secrets stored in environment variables or secrets manager
- [ ] Rate limiting configured
- [ ] Comprehensive audit logging
- [ ] Error handling for all failure scenarios
- [ ] Timeouts set on all external calls
- [ ] Connection pooling for databases
- [ ] Caching strategy implemented
- [ ] Monitoring and metrics collection
- [ ] Health checks configured
- [ ] Environment-specific configurations
- [ ] Docker/containerization setup
- [ ] Documentation updated
- [ ] Load testing completed
Conclusion: From Prototype to Production
The journey from "it works on my machine" to production-ready is significant, but following these practices will give you a robust, secure, and performant MCP server.
Key takeaways:
- Security first - Never compromise on authentication, validation, and logging
- Performance matters - Use pooling, caching, and async operations
- Fail gracefully - Comprehensive error handling and resilience patterns
- Monitor everything - You can't fix what you can't see
- Plan for scale - Even if you don't need it today
MCP servers bridge AI capabilities with real-world systems. Building them responsibly means respecting the power they wield.
Series Wrap-Up
What we covered:
- Part 1: MCP concepts and why they matter
- Part 2: Building your first MCP server
- Part 3: Making it production-ready
You now have everything you need to build secure, performant MCP servers that can power real-world AI applications.
What will you build? Share your MCP projects in the comments—I'd love to see what the community creates!
Additional Resources
Thanks for following this series! If you found it valuable, consider following for more posts on AI engineering, backend development, and modern software practices. Have questions? Drop them in the comments or reach out directly.
Top comments (0)