Python Application Performance Monitoring That Actually Works in 2026
Most Python APM setups fall into two traps: either they instrument everything (drowning you in data), or they add a SaaS SDK and call it done (missing the metrics that matter). Production observability requires intentional instrumentation — measuring what helps you debug production issues, not just what's easy to measure.
This guide shows how to build production-grade observability for Python applications using OpenTelemetry, structured logging, and custom metrics.
The Three Pillars, Actually Used
Logs → "What happened?" — structured events with context
Traces → "Where did it happen?" — request flow across services
Metrics → "How often and how fast?" — aggregated measurements
Most teams implement all three but use none of them effectively. Here's how to make each pillar actually useful.
1. Structured Logging That Enables Debugging
Stop using print() and logging.info("something happened"). Structured logs with context are searchable and correlatable.
import structlog
import logging
from opentelemetry import trace
# Configure structlog with JSON output
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
)
logger = structlog.get_logger()
# Middleware that adds request context to ALL logs
class RequestContextMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
# Bind context that appears in every log line
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=scope.get("request_id", "unknown"),
method=scope["method"],
path=scope["path"],
client_ip=scope.get("client", ("unknown",))[0],
)
# Add trace context for log-trace correlation
span = trace.get_current_span()
if span.is_recording():
ctx = span.get_span_context()
structlog.contextvars.bind_contextvars(
trace_id=format(ctx.trace_id, "032x"),
span_id=format(ctx.span_id, "016x"),
)
await self.app(scope, receive, send)
Every log line now has request context:
{
"event": "payment_processed",
"level": "info",
"timestamp": "2026-03-22T10:15:30.123Z",
"request_id": "req_abc123",
"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
"method": "POST",
"path": "/api/payments",
"amount": 99.99,
"currency": "USD",
"customer_id": "cust_789"
}
2. Distributed Tracing with OpenTelemetry
Auto-instrumentation is a start, but custom spans on business logic are where tracing becomes powerful.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from functools import wraps
def setup_tracing(service_name: str):
provider = TracerProvider(
resource=Resource.create({
"service.name": service_name,
"service.version": os.getenv("GIT_SHA", "unknown"),
"deployment.environment": os.getenv("ENV", "development"),
})
)
provider.add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(endpoint="http://otel-collector:4317"),
max_queue_size=2048,
max_export_batch_size=512,
)
)
trace.set_tracer_provider(provider)
# Auto-instrument frameworks
FastAPIInstrumentor.instrument()
HTTPXClientInstrumentor().instrument()
SQLAlchemyInstrumentor().instrument()
tracer = trace.get_tracer(__name__)
# Custom span decorator for business logic
def traced(name: str = None, attributes: dict = None):
def decorator(func):
span_name = name or f"{func.__module__}.{func.__qualname__}"
@wraps(func)
async def wrapper(*args, **kwargs):
with tracer.start_as_current_span(span_name) as span:
if attributes:
for k, v in attributes.items():
span.set_attribute(k, v)
try:
result = await func(*args, **kwargs)
span.set_status(trace.StatusCode.OK)
return result
except Exception as e:
span.set_status(trace.StatusCode.ERROR, str(e))
span.record_exception(e)
raise
return wrapper
return decorator
# Usage:
@traced("payment.process", {"payment.type": "credit_card"})
async def process_payment(amount: float, customer_id: str):
with tracer.start_as_current_span("payment.validate") as span:
span.set_attribute("payment.amount", amount)
validate_amount(amount)
with tracer.start_as_current_span("payment.charge") as span:
span.set_attribute("customer.id", customer_id)
result = await charge_card(customer_id, amount)
span.set_attribute("payment.status", result.status)
return result
3. Custom Metrics That Drive Alerts
Don't just measure HTTP latency — measure the things that affect your users.
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
def setup_metrics():
reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://otel-collector:4317"),
export_interval_millis=10000,
)
provider = MeterProvider(metric_readers=[reader])
metrics.set_meter_provider(provider)
meter = metrics.get_meter(__name__)
# Business metrics — these tell you if your app is working
payment_counter = meter.create_counter(
"payments.total",
description="Total payment attempts",
unit="1",
)
payment_amount = meter.create_histogram(
"payments.amount",
description="Payment amounts processed",
unit="USD",
)
queue_depth = meter.create_up_down_counter(
"queue.depth",
description="Current items in processing queue",
unit="1",
)
# Infrastructure metrics
db_pool_size = meter.create_observable_gauge(
"db.pool.active_connections",
callbacks=[lambda options: [
metrics.Observation(engine.pool.checkedout(), {"db": "primary"})
]],
)
async def process_payment(amount: float, method: str):
payment_counter.add(1, {"method": method, "status": "attempted"})
try:
result = await charge(amount)
payment_counter.add(1, {"method": method, "status": "success"})
payment_amount.record(amount, {"method": method})
return result
except PaymentDeclined:
payment_counter.add(1, {"method": method, "status": "declined"})
raise
except Exception:
payment_counter.add(1, {"method": method, "status": "error"})
raise
4. The Middleware That Ties It Together
One middleware that handles logging, tracing, and metrics for every request:
import time
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
class ObservabilityMiddleware(BaseHTTPMiddleware):
def __init__(self, app):
super().__init__(app)
self.request_duration = meter.create_histogram(
"http.server.duration",
description="HTTP request duration",
unit="s",
)
self.request_count = meter.create_counter(
"http.server.requests",
description="HTTP request count",
)
async def dispatch(self, request: Request, call_next):
start = time.perf_counter()
method = request.method
path = request.url.path
# Normalize path to avoid cardinality explosion
# /users/123/orders -> /users/{id}/orders
normalized_path = self._normalize_path(path)
try:
response = await call_next(request)
status = response.status_code
level = "warning" if status >= 400 else "info"
except Exception as e:
status = 500
level = "error"
logger.exception("unhandled_error", error=str(e))
raise
finally:
duration = time.perf_counter() - start
attrs = {
"http.method": method,
"http.route": normalized_path,
"http.status_code": status,
}
self.request_duration.record(duration, attrs)
self.request_count.add(1, attrs)
logger.log(
level,
"http_request",
duration_ms=round(duration * 1000, 2),
status=status,
path=path,
)
return response
@staticmethod
def _normalize_path(path: str) -> str:
"""Replace dynamic segments to prevent metric cardinality explosion."""
import re
# Replace UUIDs
path = re.sub(
r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
'{id}', path
)
# Replace numeric IDs
path = re.sub(r'/\d+', '/{id}', path)
return path
5. Alerting Patterns That Reduce Noise
# alerts.yaml — Alert on symptoms, not causes
groups:
- name: business-health
rules:
# Alert when payment success rate drops
- alert: PaymentSuccessRateLow
expr: |
(
sum(rate(payments_total{status="success"}[5m]))
/
sum(rate(payments_total{status="attempted"}[5m]))
) < 0.95
for: 5m
labels:
severity: critical
annotations:
summary: "Payment success rate below 95%"
# Alert on latency — p99, not average
- alert: HighLatency
expr: |
histogram_quantile(0.99,
sum(rate(http_server_duration_bucket[5m])) by (le, http_route)
) > 2.0
for: 10m
labels:
severity: warning
annotations:
summary: "p99 latency > 2s for {{ $labels.http_route }}"
# Alert on error budget burn rate (SLO-based)
- alert: ErrorBudgetBurn
expr: |
(
1 - (
sum(rate(http_server_requests{http_status_code!~"5.."}[1h]))
/
sum(rate(http_server_requests[1h]))
)
) > (1 - 0.999) * 14.4
for: 5m
labels:
severity: critical
annotations:
summary: "Error budget burning 14.4x faster than allowed"
6. Profiling in Production
For CPU-bound Python code, continuous profiling catches performance regressions:
# Lightweight continuous profiling with py-spy export
import subprocess
import threading
import os
def start_continuous_profiler(output_dir: str = "/tmp/profiles"):
"""Start py-spy as a background profiler."""
os.makedirs(output_dir, exist_ok=True)
pid = os.getpid()
def profile_loop():
while True:
timestamp = int(time.time())
output = f"{output_dir}/profile_{timestamp}.speedscope"
subprocess.run(
["py-spy", "record",
"--pid", str(pid),
"--duration", "60",
"--format", "speedscope",
"--output", output],
capture_output=True,
)
thread = threading.Thread(target=profile_loop, daemon=True)
thread.start()
The Observability Stack
| Component | Tool | Why |
|---|---|---|
| Instrumentation | OpenTelemetry SDK | Vendor-neutral, standard API |
| Log format | structlog (JSON) | Searchable, parseable |
| Trace export | OTLP/gRPC | Efficient, supports batching |
| Metrics | Prometheus-compatible | Industry standard |
| Collection | OpenTelemetry Collector | Fan-out to multiple backends |
| Alerting | Prometheus + Alertmanager | SLO-based alerts |
Key Takeaways
- Instrument business logic, not just infrastructure — payment success rate matters more than CPU utilization.
-
Correlate logs and traces — every log line should include
trace_idfor drill-down. -
Normalize metric labels —
/users/123becomes/users/{id}to prevent cardinality explosion. - Alert on symptoms — "payment success rate < 95%" is actionable; "CPU > 80%" is not.
- Use histograms for latency — p99 catches problems that averages hide.
The goal of observability isn't to collect data — it's to answer "why is this broken?" in under 5 minutes.
Based on production observability implementations for Python services handling 10K+ requests/second.
Top comments (0)