GitHub: fastmcp-production-template — fork, rename the tools, ship.
Over the past year I built and deployed 20+ MCP servers in production inside a regulated life sciences environment — powering AI agents that search 57M+ research records, automate scientific workflows, and surface real-time data to LLMs via 150+ registered tools.
Every new server started the same way: copy the FastMCP quickstart, then spend days re-solving the same four problems:
- Async database connections that deadlock under concurrent agent calls
- No guardrails against prompt injection via unauthorized tool invocation
- Zero observability — metrics defined but never actually recorded
- Kubernetes deployments cobbled together from unrelated examples
After the third server, I extracted the patterns that actually held up in production and built this template. This is not a toy demo. Fork it, rename the tools to match your domain, and ship.
What Is MCP and Why Does It Need a Production Template?
The Model Context Protocol (MCP) is an open standard that lets AI agents call external tools — databases, APIs, internal services — via a structured JSON-RPC interface. FastMCP is the Python framework that implements it.
Most MCP examples stop at "it works on localhost." The moment you move to production you hit the same set of problems. This template solves all of them.
Problem 1: Async PostgreSQL Connection Pooling
Under concurrent agent load, naive database connections either exhaust the pool or serialize requests through a single connection. The fix is a bounded asyncpg pool initialized once at server startup and shared across all tool calls.
@asynccontextmanager
async def lifespan(server: FastMCP):
await db_pool.initialize() # creates the asyncpg pool
set_pool(db_pool) # exposes it to tools via module-level singleton
yield
await db_pool.close() # graceful drain on shutdown
The pool is exposed via a module-level singleton (mirroring the pattern in db/pool.py) so tool functions never need to thread connection state through function arguments.
class DatabasePool:
async def fetch(self, query: str, *args) -> list[dict]:
async with self.acquire() as conn:
rows = await conn.fetch(query, *args)
return [dict(row) for row in rows]
For paginated search tools, asyncio.gather runs the data fetch and count query in parallel — cutting round-trip time roughly in half:
results, total = await asyncio.gather(
db.fetch(sql, *params),
db.fetchval(count_sql, *count_params),
)
Pool configuration (DB_POOL_MIN=5, DB_POOL_MAX=20) comes from environment variables via Pydantic Settings. The min-5 floor means warm connections are always available; the max-20 ceiling prevents runaway connection exhaustion under burst load.
Problem 2: Prompt Injection via Tool Invocation
A malicious prompt can ask an LLM to call internal debug tools, admin endpoints, or anything the MCP server exposes. The fix is an explicit allowlist — only tools on the list can be invoked.
# config/allowlist.yaml
allowed_tools:
- search_records
- get_record_detail
- get_statistics
# get_pool_status intentionally omitted — health is always reachable
The list is loaded once at startup and enforced via a decorator:
def require_allowlist(tool_name: str):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
if not is_allowed(tool_name):
logger.warning(f"Blocked access to tool '{tool_name}' — not in allowlist")
raise PermissionError(
f"Tool '{tool_name}' is not permitted."
)
return await func(*args, **kwargs)
return wrapper
return decorator
Decorator order matters. The @instrument_tool decorator (introduced in the observability section below) must be the outermost layer so that blocked calls are counted in metrics — giving you visibility into prompt injection attempts, not just successful calls:
@instrument_tool("search_records") # outermost — records ALL attempts including blocked
@require_allowlist("search_records") # inner — raises PermissionError if not on list
async def search_records(query: str, limit: int = 20) -> dict:
...
SQL injection is prevented at two levels. All user-supplied values use asyncpg parameterized queries ($1, $2). Column names used in dynamic queries are validated against a hardcoded allowlist before the query is constructed:
allowed_filter_cols = {"status", "type", "category"}
if filters:
invalid = set(filters) - allowed_filter_cols
if invalid:
raise ValueError(f"Invalid filter column(s): {invalid}")
Problem 3: Observability
This is the section most MCP templates get wrong — and where I spent the most time getting it right.
The naive approach is to define OpenTelemetry metrics in a setup function and assume they get populated. They don't. Defining a counter and incrementing a counter are two separate things, and many templates (including early versions of this one) only do the first half.
The fix: @instrument_tool
Every tool call needs to be wrapped in a decorator that records the metrics. This decorator is placed as the outermost layer on each tool function:
# src/server/observability/instrument.py
def instrument_tool(tool_name: str):
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
tel = get_telemetry()
if tel is None: # unit tests — zero-overhead pass-through
return await func(*args, **kwargs)
attrs = {"tool": tool_name}
tel.tool_calls.add(1, attrs)
start = time.monotonic()
with tel.tracer.start_as_current_span(tool_name):
try:
result = await func(*args, **kwargs)
tel.tool_duration.record((time.monotonic() - start) * 1000, attrs)
return result
except Exception:
tel.tool_errors.add(1, attrs)
raise
return wrapper
return decorator
get_telemetry() reads from a module-level singleton in observability/context.py — the same pattern used by db/pool.py for the database pool. It returns None when telemetry is not initialized (unit tests), making the decorator a zero-overhead pass-through in that case.
Four metrics recorded on every call
| Metric | Type | What it tells you |
|---|---|---|
mcp.tool.calls |
Counter | Total invocations per tool (including blocked) |
mcp.tool.errors |
Counter | Failures per tool — allowlist blocks, DB errors, validation failures |
mcp.tool.duration |
Histogram (ms) | Latency distribution — p50/p95/p99 per tool |
mcp.db.pool_size |
Gauge | Live pool size at each export interval |
The pool size gauge — a subtle fix
Observable gauges require a callback registered at creation time. Without it the gauge exists but reports nothing. The callback uses late binding via get_pool() so it works even though the pool is initialized after setup_telemetry() runs:
def _pool_size_callback(options):
"""Called by the metrics SDK at each 30s export interval."""
try:
from ..db.pool import get_pool # lazy import — avoids circular dependency
asyncpg_pool = get_pool()._pool
if asyncpg_pool is not None:
return [Observation(asyncpg_pool.get_size())]
except RuntimeError:
pass # pool not yet initialized — skip this cycle
return []
db_pool_size = meter.create_observable_gauge(
"mcp.db.pool_size",
callbacks=[_pool_size_callback], # ← was missing in early versions
description="Current DB connection pool size",
)
OTLP or console — controlled by env var
The server emits standard OTLP and is completely backend-agnostic. Whether it exports to a real collector or falls back to stdout is controlled by a single env var:
telemetry = setup_telemetry(
settings.service_name,
otel_enabled=settings.otel_enabled, # True → OTLP gRPC, False → console
otlp_endpoint=settings.otel_exporter_otlp_endpoint,
)
set_telemetry(telemetry) # makes it accessible to @instrument_tool via context.py
# .env
OTEL_ENABLED=true
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
Multi-backend routing via collector configs
The collector/ directory provides three pre-built configs for routing OTLP to different backends — swap the one your infrastructure uses without touching application code:
| File | Routes to |
|---|---|
collector/adot-aws.yaml |
AWS X-Ray (traces) + CloudWatch EMF (metrics) via ADOT sidecar |
collector/otelcol-grafana.yaml |
Grafana Tempo (traces) + Prometheus (metrics) |
collector/otelcol-datadog.yaml |
Datadog APM + Datadog Metrics |
FastMCP server → OTLP :4317 → Collector (adot / otelcol) → X-Ray / Grafana / Datadog
Local observability in one command
docker/docker-compose.observe.yml adds the full Grafana LGTM stack alongside the MCP server:
make observe-up
# Open http://localhost:3000 — traces and metrics appear immediately, no login
make observe-down
This starts: OpenTelemetry Collector → Grafana Tempo (traces) + Prometheus (metrics) → Grafana (dashboards). OTEL_ENABLED is set to true automatically. Datasources are auto-provisioned — no manual Grafana setup.
Problem 4: Kubernetes Deployment
The Helm chart handles everything a production Kubernetes deployment needs.
Horizontal Pod Autoscaler — scales 2–8 replicas at 70% CPU:
autoscaling:
enabled: true
minReplicas: 2
maxReplicas: 8
targetCPUUtilizationPercentage: 70
External Secrets Operator — pulls DATABASE_URL and API_KEY from HashiCorp Vault (or any ESO-compatible store) without ever putting secrets in your Helm values:
externalSecrets:
enabled: true
secretStoreName: vault-backend
secrets:
- remoteKey: mcp-server/database
data:
- secretKey: DATABASE_URL
remoteRef:
key: database_url
Kubernetes health probes point to /health, which checks the database pool before returning 200:
@mcp.custom_route("/health", methods=["GET"])
async def http_health(request: Request) -> JSONResponse:
try:
pool = get_pool()
pool_status = await pool.health_check()
return JSONResponse({"status": "ok", "pool": pool_status})
except RuntimeError:
return JSONResponse(
{"status": "degraded", "detail": "database pool not initialized"},
status_code=503,
)
For AWS deployments, the optional ADOT sidecar annotation in values.yaml routes traces directly to X-Ray without any application code changes — just enable adot.enabled: true and point OTEL_EXPORTER_OTLP_ENDPOINT at localhost:4317.
Production Safeguards in Settings
Two validators in Settings catch the most common production misconfigurations at startup — not at 2am when the first request fails:
@field_validator("database_url")
@classmethod
def validate_database_url(cls, v: str) -> str:
if not v.startswith(("postgresql://", "postgres://")):
raise ValueError("DATABASE_URL must be a valid PostgreSQL connection string")
return v
@model_validator(mode="after")
def validate_api_key_in_production(self) -> "Settings":
if (
self.api_key_enabled
and self.log_level.upper() != "DEBUG"
and self.api_key == "change-me-in-production"
):
raise ValueError(
"API_KEY must be changed from the default value before running in production. "
"Set LOG_LEVEL=DEBUG to bypass this check in development."
)
return self
The second validator is deliberately bypassed with LOG_LEVEL=DEBUG — so local dev still works with the default key, but a misconfigured production deploy fails loudly at boot.
Testing
A production-ready template should have production-grade tests. The suite has two tiers:
Unit tests — no database, no network, runs in ~0.5s:
make test-unit # 28 tests
Covers: allowlist loading and enforcement, @instrument_tool decorator (call counting, error counting, duration recording, span wrapping, functools.wraps preservation), setup_telemetry console and OTLP modes, pool gauge callback with uninitialized pool, all four tool functions with mocked database, and SQL injection prevention.
Integration tests — runs against real PostgreSQL:
# Start postgres first:
docker compose -f docker/docker-compose.yml up -d postgres
# Run against it:
DATABASE_URL=postgresql://mcpuser:mcppassword@localhost:5432/mcpdb \
make test-integration # 21 tests
Covers: real DatabasePool lifecycle (initialize, fetch, fetchrow, fetchval, execute, health_check), search_records pagination and filter behaviour, get_record_detail found and not-found paths, get_statistics grouping and totals, and SQL injection prevention against a live query engine.
CI runs both phases automatically — unit tests first, integration tests in a separate job with a postgres:16 service container. Unit tests alone reach 82% coverage; main.py and settings.py are excluded (covered by integration tests) and documented as such.
Getting Started
Two minutes with Docker Compose
git clone https://github.com/ManjunathGovindaraju/fastmcp-production-template.git
cd fastmcp-production-template
cp .env.example .env
docker compose -f docker/docker-compose.yml up
MCP server at http://localhost:8000/mcp. PostgreSQL starts with sample data from docker/init.sql.
With full observability
cp .env.example .env
make observe-up
# MCP server: http://localhost:8000/mcp
# Grafana: http://localhost:3000 (traces + metrics, no login)
make observe-down
Deploy to Kubernetes
helm install fastmcp k8s/helm/ \
--set image.tag=v1.0.0 \
--set externalSecrets.secretStoreName=your-vault-store \
--namespace mcp-system \
--create-namespace
Adding a New Tool
Three steps:
1. Create the tool in src/server/tools/your_tool.py:
from ..config.security import require_allowlist
from ..db.pool import get_pool
from ..observability.instrument import instrument_tool
@instrument_tool("your_tool_name") # outermost — metrics + trace span
@require_allowlist("your_tool_name") # inner — blocks if not in allowlist
async def your_tool_name(param: str) -> dict:
row = await get_pool().fetchrow(
"SELECT * FROM your_table WHERE id = $1", param
)
return dict(row) if row else {}
2. Register it in src/server/main.py:
from .tools import your_tool
mcp.add_tool(your_tool.your_tool_name)
3. Add it to the allowlist in config/allowlist.yaml:
allowed_tools:
- your_tool_name
What This Is Not
This template does not include:
- Application-specific business logic — the tools (
search_records,get_record_detail,get_statistics) are placeholders. Replace them with your domain. - A specific LLM or agent framework — it is the tool-serving layer. Pair it with LangGraph, LangChain, or any MCP-compatible agent.
- A database migration tool — schema versioning (Alembic, Flyway) is intentionally out of scope. Add the migration tooling your team already uses.
Project Structure
fastmcp-production-template/
├── src/server/
│ ├── main.py # FastMCP entry point, lifespan hooks
│ ├── config/
│ │ ├── settings.py # Pydantic Settings (env / .env)
│ │ └── security.py # Allowlist loader + @require_allowlist
│ ├── db/
│ │ ├── connection.py # asyncpg DatabasePool
│ │ └── pool.py # Module-level singleton
│ ├── observability/
│ │ ├── telemetry.py # OTel setup — OTLP (prod) or console (dev)
│ │ ├── context.py # Telemetry singleton — get/set
│ │ └── instrument.py # @instrument_tool decorator
│ └── tools/
│ ├── search.py # search_records
│ ├── detail.py # get_record_detail
│ ├── stats.py # get_statistics
│ └── health.py # get_pool_status (no allowlist)
├── collector/
│ ├── adot-aws.yaml # → AWS X-Ray + CloudWatch
│ ├── otelcol-grafana.yaml # → Grafana Tempo + Prometheus
│ └── otelcol-datadog.yaml # → Datadog
├── config/
│ └── allowlist.yaml
├── docker/
│ ├── Dockerfile
│ ├── docker-compose.yml # server + postgres
│ ├── docker-compose.observe.yml # adds Grafana LGTM stack
│ └── init.sql
├── k8s/helm/
│ ├── values.yaml
│ └── templates/
├── tests/
│ ├── conftest.py # mock_telemetry fixture
│ ├── test_security.py
│ ├── test_telemetry.py # context + @instrument_tool + setup_telemetry
│ ├── test_tools.py # tool behaviour, mocked DB
│ └── test_integration.py # real DB — skipped unless DATABASE_URL is set
└── .github/workflows/
├── ci.yml # lint → unit tests → integration tests → docker
└── release.yml
Summary
| Problem | Solution |
|---|---|
| DB connections deadlock under concurrent load |
asyncpg pool, min/max bounded, module-level singleton |
| Prompt injection via unauthorized tool calls | YAML allowlist + @require_allowlist decorator |
| Metrics defined but never recorded |
@instrument_tool decorator + context.py singleton |
| Pool size gauge silent | Late-bound callback registered at gauge creation |
| Locked to one observability backend | OTLP app + collector/ configs for ADOT, Grafana, Datadog |
| No local observability stack |
make observe-up — full Grafana LGTM in one command |
| No real test coverage | 28 unit tests + 21 integration tests, 82% coverage |
| Kubernetes complexity | Helm chart — HPA + ESO + optional ADOT sidecar |
GitHub: ManjunathGovindaraju/fastmcp-production-template — MIT license, fork freely.
About the author: Manjunath Govindaraju — Principal Software Engineer with 23 years building production systems. Currently focused on AI platform engineering: multi-agent orchestration (LangGraph), MCP servers, async data pipelines, and enterprise Kubernetes deployments.
Top comments (1)
This template addresses the four real production problems accurately — connection pooling, structured observability, health endpoints, and graceful shutdown are exactly what separates a demo MCP server from something you'd trust in production. The OpenTelemetry integration in particular is underrated; most teams bolt on observability after incidents rather than designing for it.
One dimension that's still missing from most production MCP templates is the governance layer: not just what the server did (observability), but proof of what it did (audit), what data the LLM actually saw (DLP), and the ability to terminate it mid-execution (kill switch). OpenTelemetry tells you latency and error rates. It doesn't give you a cryptographically verifiable record of every tool call with its payload hash.
Vinkius (vinkius.com) approaches this from the other direction — it pre-governs MCP servers at the protocol layer using V8 Isolate sandboxes, SHA-256 audit trails per call, compiled PII redaction before payloads reach the model, and a global kill switch. The SDK is Vurb.ts. The philosophy is governance-by-default rather than observability-after-the-fact.
Your template and Vinkius's approach are complementary: yours handles the infrastructure operational layer, Vinkius handles the trust and control layer. Together they'd cover the full production requirements. Good foundational work here.