DEV Community

Raghava Joijode
Raghava Joijode

Posted on

Production-Ready MCP Servers: Security, Performance & Deployment

Part 3 of 3 in the MCP Server Series

In Part 1, we explored MCP concepts. In Part 2, we built a working server. Now it's time to make it production-ready.

Moving from a prototype to production means addressing security, performance, reliability, and maintainability. This post covers everything you need to deploy MCP servers safely at scale.

Series Navigation:

  • Part 1: Understanding MCP conceptually
  • Part 2: Building your first MCP server
  • Part 3 (This post): Security, performance, and production deployment

Security: The Most Critical Consideration

MCP servers can read your data, execute commands, and modify systems. Security isn't optional—it's existential.

The Core Security Model

Remember: MCP servers run locally by default. They communicate via stdio (standard input/output), not network ports. This provides baseline security:

Process isolation - Only the parent application can communicate with the server

No network exposure - Unlike REST APIs, stdio servers don't open ports

Explicit configuration - Users must manually add servers to their config

But this isn't enough for production.

1. Authentication & Authorization

Even though MCP servers run locally, you should still validate requests:

import os
from functools import wraps

# Load from environment variable, not hardcoded
VALID_API_KEY = os.getenv("MCP_API_KEY")

def require_auth(func):
    """Decorator to enforce authentication on tool calls"""
    @wraps(func)
    async def wrapper(name: str, arguments: dict):
        auth_token = arguments.get("auth_token")

        if not auth_token or auth_token != VALID_API_KEY:
            return [TextContent(
                type="text",
                text="Authentication failed. Invalid or missing auth_token."
            )]

        # Remove auth_token from arguments before processing
        clean_args = {k: v for k, v in arguments.items() if k != "auth_token"}
        return await func(name, clean_args)

    return wrapper

@app.call_tool()
@require_auth
async def call_tool(name: str, arguments: dict):
    # Your tool implementation
    pass
Enter fullscreen mode Exit fullscreen mode

Update your tool schemas to require auth:

Tool(
    name="get_customer",
    description="Retrieve customer information by ID",
    inputSchema={
        "type": "object",
        "properties": {
            "auth_token": {
                "type": "string",
                "description": "Authentication token (required)"
            },
            "customer_id": {
                "type": "string",
                "description": "Customer's unique ID"
            }
        },
        "required": ["auth_token", "customer_id"]
    }
)
Enter fullscreen mode Exit fullscreen mode

2. Principle of Least Privilege

Never expose more than absolutely necessary:

# BAD: Exposing raw SQL execution
Tool(
    name="execute_query",
    description="Run any SQL query",
    inputSchema={"type": "object", "properties": {"sql": {"type": "string"}}}
)

# GOOD: Specific, limited operations
Tool(
    name="get_customer",
    description="Retrieve customer by ID (read-only)",
    inputSchema={"type": "object", "properties": {"customer_id": {"type": "string"}}}
)

Tool(
    name="update_customer_email",
    description="Update customer email only (requires admin)",
    inputSchema={
        "type": "object",
        "properties": {
            "customer_id": {"type": "string"},
            "new_email": {"type": "string"}
        }
    }
)
Enter fullscreen mode Exit fullscreen mode

3. Input Validation & Sanitization

Always validate and sanitize inputs:

import re
from typing import Optional

def validate_customer_id(customer_id: str) -> Optional[str]:
    """Validate customer ID format"""
    # Only allow alphanumeric characters and hyphens
    if not re.match(r'^[a-zA-Z0-9-]+$', customer_id):
        return "Invalid customer ID format"

    # Check length
    if len(customer_id) > 50:
        return "Customer ID too long"

    return None

def validate_email(email: str) -> Optional[str]:
    """Basic email validation"""
    email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    if not re.match(email_regex, email):
        return "Invalid email format"
    return None

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "get_customer":
        customer_id = arguments.get("customer_id")

        # Validate input
        error = validate_customer_id(customer_id)
        if error:
            return [TextContent(type="text", text=f"Error: {error}")]

        # Safe to proceed
        customer = await get_customer_from_db(customer_id)
        # ...
Enter fullscreen mode Exit fullscreen mode

4. SQL Injection Prevention

NEVER construct SQL queries with string concatenation:

# DANGEROUS - SQL Injection vulnerability
async def get_customer_bad(customer_id: str):
    query = f"SELECT * FROM customers WHERE id = '{customer_id}'"
    # If customer_id = "1' OR '1'='1", you've exposed everything!
    result = await conn.fetchrow(query)

# SAFE - Parameterized queries
async def get_customer_safe(customer_id: str):
    query = "SELECT * FROM customers WHERE id = $1"
    result = await conn.fetchrow(query, customer_id)
    return result
Enter fullscreen mode Exit fullscreen mode

5. Secrets Management

Never hardcode secrets:

# BAD
DATABASE_URL = "postgresql://user:password123@localhost/db"
API_KEY = "sk-1234567890"

# GOOD - Use environment variables
import os

DATABASE_URL = os.getenv("DATABASE_URL")
API_KEY = os.getenv("API_KEY")

if not DATABASE_URL or not API_KEY:
    raise ValueError("Missing required environment variables")
Enter fullscreen mode Exit fullscreen mode

For production, use proper secrets management:

  • AWS Secrets Manager
  • HashiCorp Vault
  • Azure Key Vault
  • Google Secret Manager

6. Rate Limiting

Prevent abuse with rate limiting:

from collections import defaultdict
from datetime import datetime, timedelta
import asyncio

class RateLimiter:
    def __init__(self, max_requests: int, time_window: int):
        self.max_requests = max_requests
        self.time_window = time_window  # seconds
        self.requests = defaultdict(list)

    def is_allowed(self, user_id: str) -> bool:
        now = datetime.now()
        cutoff = now - timedelta(seconds=self.time_window)

        # Remove old requests
        self.requests[user_id] = [
            req_time for req_time in self.requests[user_id]
            if req_time > cutoff
        ]

        # Check if under limit
        if len(self.requests[user_id]) >= self.max_requests:
            return False

        # Record this request
        self.requests[user_id].append(now)
        return True

# Create rate limiter: 100 requests per hour
rate_limiter = RateLimiter(max_requests=100, time_window=3600)

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    user_id = arguments.get("user_id", "default")

    if not rate_limiter.is_allowed(user_id):
        return [TextContent(
            type="text",
            text="Rate limit exceeded. Please try again later."
        )]

    # Process request
    # ...
Enter fullscreen mode Exit fullscreen mode

7. Audit Logging

Log every operation for security monitoring:

import logging
import json
from datetime import datetime

# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    # Log the request
    logger.info(
        "MCP tool called",
        extra={
            "tool_name": name,
            "arguments": json.dumps(arguments),
            "timestamp": datetime.utcnow().isoformat(),
            "user_id": arguments.get("user_id", "unknown")
        }
    )

    try:
        result = await execute_tool(name, arguments)

        # Log success
        logger.info(
            "MCP tool completed successfully",
            extra={"tool_name": name, "user_id": arguments.get("user_id")}
        )

        return result
    except Exception as e:
        # Log failure
        logger.error(
            "MCP tool failed",
            extra={
                "tool_name": name,
                "error": str(e),
                "user_id": arguments.get("user_id")
            }
        )
        raise
Enter fullscreen mode Exit fullscreen mode

Performance Optimization

Performance matters, especially when serving multiple AI agents or handling high request volumes.

1. Connection Pooling

Don't create new database connections for every request:

import asyncpg
from contextlib import asynccontextmanager

class DatabasePool:
    def __init__(self, database_url: str):
        self.database_url = database_url
        self.pool = None

    async def initialize(self):
        self.pool = await asyncpg.create_pool(
            self.database_url,
            min_size=5,
            max_size=20,
            command_timeout=60
        )

    async def close(self):
        if self.pool:
            await self.pool.close()

    @asynccontextmanager
    async def acquire(self):
        async with self.pool.acquire() as connection:
            yield connection

# Global pool instance
db_pool = DatabasePool(DATABASE_URL)

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "get_customer":
        async with db_pool.acquire() as conn:
            result = await conn.fetchrow(
                "SELECT * FROM customers WHERE id = $1",
                arguments["customer_id"]
            )
            return [TextContent(type="text", text=json.dumps(dict(result)))]
Enter fullscreen mode Exit fullscreen mode

2. Caching

Cache frequently accessed data:

from functools import lru_cache
import asyncio
from datetime import datetime, timedelta

class AsyncCache:
    def __init__(self, ttl_seconds: int = 300):
        self.cache = {}
        self.ttl = ttl_seconds

    def get(self, key: str):
        if key in self.cache:
            data, timestamp = self.cache[key]
            if datetime.now() - timestamp < timedelta(seconds=self.ttl):
                return data
            else:
                del self.cache[key]
        return None

    def set(self, key: str, value):
        self.cache[key] = (value, datetime.now())

cache = AsyncCache(ttl_seconds=300)  # 5 minute cache

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "get_customer":
        customer_id = arguments["customer_id"]
        cache_key = f"customer:{customer_id}"

        # Check cache
        cached_data = cache.get(cache_key)
        if cached_data:
            logger.info(f"Cache hit for {cache_key}")
            return [TextContent(type="text", text=cached_data)]

        # Fetch from database
        async with db_pool.acquire() as conn:
            result = await conn.fetchrow(
                "SELECT * FROM customers WHERE id = $1",
                customer_id
            )
            data = json.dumps(dict(result))

            # Store in cache
            cache.set(cache_key, data)

            return [TextContent(type="text", text=data)]
Enter fullscreen mode Exit fullscreen mode

3. Async Operations

Use async for I/O operations to handle concurrent requests efficiently:

import httpx
import asyncio

async def fetch_multiple_apis(customer_id: str):
    """Fetch data from multiple sources concurrently"""
    async with httpx.AsyncClient() as client:
        # Run these concurrently instead of sequentially
        results = await asyncio.gather(
            client.get(f"https://api1.com/customer/{customer_id}"),
            client.get(f"https://api2.com/orders/{customer_id}"),
            client.get(f"https://api3.com/preferences/{customer_id}"),
            return_exceptions=True  # Don't fail if one API is down
        )

        # Process results
        customer_data = {}
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                logger.warning(f"API {i+1} failed: {result}")
            else:
                customer_data[f"source_{i+1}"] = result.json()

        return customer_data
Enter fullscreen mode Exit fullscreen mode

4. Timeouts

Always set timeouts to prevent hanging operations:

import asyncio

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    try:
        # Set 30 second timeout
        result = await asyncio.wait_for(
            execute_tool_logic(name, arguments),
            timeout=30.0
        )
        return result
    except asyncio.TimeoutError:
        logger.error(f"Tool {name} timed out after 30 seconds")
        return [TextContent(
            type="text",
            text="Operation timed out. Please try again or contact support."
        )]
Enter fullscreen mode Exit fullscreen mode

Error Handling & Resilience

Production systems fail. Handle it gracefully.

1. Comprehensive Error Handling

from typing import List
import traceback

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> List[TextContent]:
    try:
        # Validate tool exists
        if name not in AVAILABLE_TOOLS:
            return [TextContent(
                type="text",
                text=f"Unknown tool: {name}. Available tools: {', '.join(AVAILABLE_TOOLS)}"
            )]

        # Execute tool
        result = await execute_tool_logic(name, arguments)
        return result

    except ValueError as e:
        # User input errors
        logger.warning(f"Validation error in {name}: {e}")
        return [TextContent(type="text", text=f"Invalid input: {str(e)}")]

    except PermissionError as e:
        # Authorization errors
        logger.warning(f"Permission denied for {name}: {e}")
        return [TextContent(type="text", text="Permission denied. Please check your credentials.")]

    except asyncpg.PostgresError as e:
        # Database errors
        logger.error(f"Database error in {name}: {e}")
        return [TextContent(type="text", text="Database error. Please try again later.")]

    except httpx.HTTPError as e:
        # External API errors
        logger.error(f"External API error in {name}: {e}")
        return [TextContent(type="text", text="External service unavailable. Please try again later.")]

    except Exception as e:
        # Catch-all for unexpected errors
        logger.error(f"Unexpected error in {name}: {e}\n{traceback.format_exc()}")
        return [TextContent(
            type="text",
            text="An unexpected error occurred. The issue has been logged and will be investigated."
        )]
Enter fullscreen mode Exit fullscreen mode

2. Retry Logic

Implement retries for transient failures:

import asyncio
from typing import TypeVar, Callable

T = TypeVar('T')

async def retry_with_backoff(
    func: Callable,
    max_attempts: int = 3,
    initial_delay: float = 1.0,
    backoff_factor: float = 2.0
) -> T:
    """Retry function with exponential backoff"""

    delay = initial_delay
    last_exception = None

    for attempt in range(max_attempts):
        try:
            return await func()
        except Exception as e:
            last_exception = e
            logger.warning(
                f"Attempt {attempt + 1}/{max_attempts} failed: {e}"
            )

            if attempt < max_attempts - 1:
                await asyncio.sleep(delay)
                delay *= backoff_factor

    # All retries failed
    raise last_exception

# Usage
async def fetch_customer_with_retry(customer_id: str):
    return await retry_with_backoff(
        lambda: fetch_customer_from_api(customer_id),
        max_attempts=3
    )
Enter fullscreen mode Exit fullscreen mode

3. Circuit Breaker

Prevent cascading failures:

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"      # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if service recovered

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        timeout_seconds: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timedelta(seconds=timeout_seconds)
        self.expected_exception = expected_exception

        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    async def call(self, func):
        if self.state == CircuitState.OPEN:
            if datetime.now() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
                logger.info("Circuit breaker entering HALF_OPEN state")
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = await func()
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise e

    def _on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = datetime.now()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            logger.error("Circuit breaker opened due to repeated failures")

# Usage
external_api_breaker = CircuitBreaker(failure_threshold=5, timeout_seconds=60)

async def call_external_api(customer_id: str):
    return await external_api_breaker.call(
        lambda: fetch_from_external_api(customer_id)
    )
Enter fullscreen mode Exit fullscreen mode

Deployment Strategies

1. Environment Configuration

Use different configs for dev/staging/prod:

import os
from enum import Enum

class Environment(Enum):
    DEVELOPMENT = "development"
    STAGING = "staging"
    PRODUCTION = "production"

class Config:
    def __init__(self):
        self.env = Environment(os.getenv("ENVIRONMENT", "development"))
        self.database_url = os.getenv("DATABASE_URL")
        self.log_level = os.getenv("LOG_LEVEL", "INFO")
        self.enable_caching = os.getenv("ENABLE_CACHING", "true").lower() == "true"
        self.rate_limit = int(os.getenv("RATE_LIMIT", "100"))

    @property
    def is_production(self):
        return self.env == Environment.PRODUCTION

config = Config()

# Use config throughout your code
if config.enable_caching:
    # Use cache
    pass
Enter fullscreen mode Exit fullscreen mode

2. Docker Deployment

Create a Dockerfile:

FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy server code
COPY customer_server.py .

# Non-root user for security
RUN useradd -m mcpuser
USER mcpuser

# Run the server
CMD ["python", "customer_server.py"]
Enter fullscreen mode Exit fullscreen mode

docker-compose.yml:

version: '3.8'

services:
  mcp-server:
    build: .
    environment:
      - DATABASE_URL=postgresql://user:pass@db:5432/customers
      - ENVIRONMENT=production
      - LOG_LEVEL=INFO
    depends_on:
      - db
    restart: unless-stopped

  db:
    image: postgres:15
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
      - POSTGRES_DB=customers
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  postgres_data:
Enter fullscreen mode Exit fullscreen mode

3. Monitoring & Observability

Add health checks and metrics:

from prometheus_client import Counter, Histogram, start_http_server
import time

# Metrics
tool_calls_total = Counter('mcp_tool_calls_total', 'Total tool calls', ['tool_name', 'status'])
tool_duration = Histogram('mcp_tool_duration_seconds', 'Tool execution time', ['tool_name'])

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    start_time = time.time()

    try:
        result = await execute_tool_logic(name, arguments)
        tool_calls_total.labels(tool_name=name, status='success').inc()
        return result
    except Exception as e:
        tool_calls_total.labels(tool_name=name, status='error').inc()
        raise
    finally:
        duration = time.time() - start_time
        tool_duration.labels(tool_name=name).observe(duration)

# Start Prometheus metrics server (on a different port)
start_http_server(8000)
Enter fullscreen mode Exit fullscreen mode

Real-World Considerations

Scaling Horizontally

For high-traffic scenarios:

  • Deploy multiple MCP server instances
  • Use a load balancer (though most MCP servers are stdio-based and tied to a single client)
  • Consider transitioning to HTTP-based MCP servers for true horizontal scaling

Data Privacy

  • PII Handling: Minimize exposure of personally identifiable information
  • Data Retention: Implement policies for log data
  • GDPR/CCPA Compliance: Ensure audit logs can be purged on request
  • Encryption: Use TLS for remote MCP servers

Maintenance

  • Versioning: Use semantic versioning for your MCP servers
  • Deprecation Strategy: Give users notice before removing tools
  • Backward Compatibility: Don't break existing tool contracts
  • Documentation: Maintain up-to-date API documentation

Checklist: Production Readiness

Before deploying to production, verify:

  • [ ] Authentication implemented
  • [ ] Input validation on all tools
  • [ ] SQL injection prevention (parameterized queries)
  • [ ] Secrets stored in environment variables or secrets manager
  • [ ] Rate limiting configured
  • [ ] Comprehensive audit logging
  • [ ] Error handling for all failure scenarios
  • [ ] Timeouts set on all external calls
  • [ ] Connection pooling for databases
  • [ ] Caching strategy implemented
  • [ ] Monitoring and metrics collection
  • [ ] Health checks configured
  • [ ] Environment-specific configurations
  • [ ] Docker/containerization setup
  • [ ] Documentation updated
  • [ ] Load testing completed

Conclusion: From Prototype to Production

The journey from "it works on my machine" to production-ready is significant, but following these practices will give you a robust, secure, and performant MCP server.

Key takeaways:

  1. Security first - Never compromise on authentication, validation, and logging
  2. Performance matters - Use pooling, caching, and async operations
  3. Fail gracefully - Comprehensive error handling and resilience patterns
  4. Monitor everything - You can't fix what you can't see
  5. Plan for scale - Even if you don't need it today

MCP servers bridge AI capabilities with real-world systems. Building them responsibly means respecting the power they wield.


Series Wrap-Up

What we covered:

  • Part 1: MCP concepts and why they matter
  • Part 2: Building your first MCP server
  • Part 3: Making it production-ready

You now have everything you need to build secure, performant MCP servers that can power real-world AI applications.

What will you build? Share your MCP projects in the comments—I'd love to see what the community creates!


Additional Resources


Thanks for following this series! If you found it valuable, consider following for more posts on AI engineering, backend development, and modern software practices. Have questions? Drop them in the comments or reach out directly.

Top comments (0)