DEV Community

Young Gao
Young Gao

Posted on

Python Application Performance Monitoring That Actually Works in 2026

Python Application Performance Monitoring That Actually Works in 2026

Most Python APM setups fall into two traps: either they instrument everything (drowning you in data), or they add a SaaS SDK and call it done (missing the metrics that matter). Production observability requires intentional instrumentation — measuring what helps you debug production issues, not just what's easy to measure.

This guide shows how to build production-grade observability for Python applications using OpenTelemetry, structured logging, and custom metrics.

The Three Pillars, Actually Used

Logs    → "What happened?" — structured events with context
Traces  → "Where did it happen?" — request flow across services
Metrics → "How often and how fast?" — aggregated measurements
Enter fullscreen mode Exit fullscreen mode

Most teams implement all three but use none of them effectively. Here's how to make each pillar actually useful.

1. Structured Logging That Enables Debugging

Stop using print() and logging.info("something happened"). Structured logs with context are searchable and correlatable.

import structlog
import logging
from opentelemetry import trace

# Configure structlog with JSON output
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.StackInfoRenderer(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
)

logger = structlog.get_logger()

# Middleware that adds request context to ALL logs
class RequestContextMiddleware:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        if scope["type"] == "http":
            # Bind context that appears in every log line
            structlog.contextvars.clear_contextvars()
            structlog.contextvars.bind_contextvars(
                request_id=scope.get("request_id", "unknown"),
                method=scope["method"],
                path=scope["path"],
                client_ip=scope.get("client", ("unknown",))[0],
            )

            # Add trace context for log-trace correlation
            span = trace.get_current_span()
            if span.is_recording():
                ctx = span.get_span_context()
                structlog.contextvars.bind_contextvars(
                    trace_id=format(ctx.trace_id, "032x"),
                    span_id=format(ctx.span_id, "016x"),
                )

        await self.app(scope, receive, send)
Enter fullscreen mode Exit fullscreen mode

Every log line now has request context:

{
  "event": "payment_processed",
  "level": "info",
  "timestamp": "2026-03-22T10:15:30.123Z",
  "request_id": "req_abc123",
  "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
  "method": "POST",
  "path": "/api/payments",
  "amount": 99.99,
  "currency": "USD",
  "customer_id": "cust_789"
}
Enter fullscreen mode Exit fullscreen mode

2. Distributed Tracing with OpenTelemetry

Auto-instrumentation is a start, but custom spans on business logic are where tracing becomes powerful.

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from functools import wraps

def setup_tracing(service_name: str):
    provider = TracerProvider(
        resource=Resource.create({
            "service.name": service_name,
            "service.version": os.getenv("GIT_SHA", "unknown"),
            "deployment.environment": os.getenv("ENV", "development"),
        })
    )
    provider.add_span_processor(
        BatchSpanProcessor(
            OTLPSpanExporter(endpoint="http://otel-collector:4317"),
            max_queue_size=2048,
            max_export_batch_size=512,
        )
    )
    trace.set_tracer_provider(provider)

    # Auto-instrument frameworks
    FastAPIInstrumentor.instrument()
    HTTPXClientInstrumentor().instrument()
    SQLAlchemyInstrumentor().instrument()

tracer = trace.get_tracer(__name__)

# Custom span decorator for business logic
def traced(name: str = None, attributes: dict = None):
    def decorator(func):
        span_name = name or f"{func.__module__}.{func.__qualname__}"
        @wraps(func)
        async def wrapper(*args, **kwargs):
            with tracer.start_as_current_span(span_name) as span:
                if attributes:
                    for k, v in attributes.items():
                        span.set_attribute(k, v)
                try:
                    result = await func(*args, **kwargs)
                    span.set_status(trace.StatusCode.OK)
                    return result
                except Exception as e:
                    span.set_status(trace.StatusCode.ERROR, str(e))
                    span.record_exception(e)
                    raise
        return wrapper
    return decorator

# Usage:
@traced("payment.process", {"payment.type": "credit_card"})
async def process_payment(amount: float, customer_id: str):
    with tracer.start_as_current_span("payment.validate") as span:
        span.set_attribute("payment.amount", amount)
        validate_amount(amount)

    with tracer.start_as_current_span("payment.charge") as span:
        span.set_attribute("customer.id", customer_id)
        result = await charge_card(customer_id, amount)
        span.set_attribute("payment.status", result.status)

    return result
Enter fullscreen mode Exit fullscreen mode

3. Custom Metrics That Drive Alerts

Don't just measure HTTP latency — measure the things that affect your users.

from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter

def setup_metrics():
    reader = PeriodicExportingMetricReader(
        OTLPMetricExporter(endpoint="http://otel-collector:4317"),
        export_interval_millis=10000,
    )
    provider = MeterProvider(metric_readers=[reader])
    metrics.set_meter_provider(provider)

meter = metrics.get_meter(__name__)

# Business metrics — these tell you if your app is working
payment_counter = meter.create_counter(
    "payments.total",
    description="Total payment attempts",
    unit="1",
)

payment_amount = meter.create_histogram(
    "payments.amount",
    description="Payment amounts processed",
    unit="USD",
)

queue_depth = meter.create_up_down_counter(
    "queue.depth",
    description="Current items in processing queue",
    unit="1",
)

# Infrastructure metrics
db_pool_size = meter.create_observable_gauge(
    "db.pool.active_connections",
    callbacks=[lambda options: [
        metrics.Observation(engine.pool.checkedout(), {"db": "primary"})
    ]],
)

async def process_payment(amount: float, method: str):
    payment_counter.add(1, {"method": method, "status": "attempted"})
    try:
        result = await charge(amount)
        payment_counter.add(1, {"method": method, "status": "success"})
        payment_amount.record(amount, {"method": method})
        return result
    except PaymentDeclined:
        payment_counter.add(1, {"method": method, "status": "declined"})
        raise
    except Exception:
        payment_counter.add(1, {"method": method, "status": "error"})
        raise
Enter fullscreen mode Exit fullscreen mode

4. The Middleware That Ties It Together

One middleware that handles logging, tracing, and metrics for every request:

import time
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request

class ObservabilityMiddleware(BaseHTTPMiddleware):
    def __init__(self, app):
        super().__init__(app)
        self.request_duration = meter.create_histogram(
            "http.server.duration",
            description="HTTP request duration",
            unit="s",
        )
        self.request_count = meter.create_counter(
            "http.server.requests",
            description="HTTP request count",
        )

    async def dispatch(self, request: Request, call_next):
        start = time.perf_counter()
        method = request.method
        path = request.url.path

        # Normalize path to avoid cardinality explosion
        # /users/123/orders -> /users/{id}/orders
        normalized_path = self._normalize_path(path)

        try:
            response = await call_next(request)
            status = response.status_code
            level = "warning" if status >= 400 else "info"
        except Exception as e:
            status = 500
            level = "error"
            logger.exception("unhandled_error", error=str(e))
            raise
        finally:
            duration = time.perf_counter() - start
            attrs = {
                "http.method": method,
                "http.route": normalized_path,
                "http.status_code": status,
            }
            self.request_duration.record(duration, attrs)
            self.request_count.add(1, attrs)

            logger.log(
                level,
                "http_request",
                duration_ms=round(duration * 1000, 2),
                status=status,
                path=path,
            )

        return response

    @staticmethod
    def _normalize_path(path: str) -> str:
        """Replace dynamic segments to prevent metric cardinality explosion."""
        import re
        # Replace UUIDs
        path = re.sub(
            r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
            '{id}', path
        )
        # Replace numeric IDs
        path = re.sub(r'/\d+', '/{id}', path)
        return path
Enter fullscreen mode Exit fullscreen mode

5. Alerting Patterns That Reduce Noise

# alerts.yaml — Alert on symptoms, not causes
groups:
  - name: business-health
    rules:
      # Alert when payment success rate drops
      - alert: PaymentSuccessRateLow
        expr: |
          (
            sum(rate(payments_total{status="success"}[5m]))
            /
            sum(rate(payments_total{status="attempted"}[5m]))
          ) < 0.95
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Payment success rate below 95%"

      # Alert on latency — p99, not average
      - alert: HighLatency
        expr: |
          histogram_quantile(0.99,
            sum(rate(http_server_duration_bucket[5m])) by (le, http_route)
          ) > 2.0
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "p99 latency > 2s for {{ $labels.http_route }}"

      # Alert on error budget burn rate (SLO-based)
      - alert: ErrorBudgetBurn
        expr: |
          (
            1 - (
              sum(rate(http_server_requests{http_status_code!~"5.."}[1h]))
              /
              sum(rate(http_server_requests[1h]))
            )
          ) > (1 - 0.999) * 14.4
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Error budget burning 14.4x faster than allowed"
Enter fullscreen mode Exit fullscreen mode

6. Profiling in Production

For CPU-bound Python code, continuous profiling catches performance regressions:

# Lightweight continuous profiling with py-spy export
import subprocess
import threading
import os

def start_continuous_profiler(output_dir: str = "/tmp/profiles"):
    """Start py-spy as a background profiler."""
    os.makedirs(output_dir, exist_ok=True)
    pid = os.getpid()

    def profile_loop():
        while True:
            timestamp = int(time.time())
            output = f"{output_dir}/profile_{timestamp}.speedscope"
            subprocess.run(
                ["py-spy", "record",
                 "--pid", str(pid),
                 "--duration", "60",
                 "--format", "speedscope",
                 "--output", output],
                capture_output=True,
            )

    thread = threading.Thread(target=profile_loop, daemon=True)
    thread.start()
Enter fullscreen mode Exit fullscreen mode

The Observability Stack

Component Tool Why
Instrumentation OpenTelemetry SDK Vendor-neutral, standard API
Log format structlog (JSON) Searchable, parseable
Trace export OTLP/gRPC Efficient, supports batching
Metrics Prometheus-compatible Industry standard
Collection OpenTelemetry Collector Fan-out to multiple backends
Alerting Prometheus + Alertmanager SLO-based alerts

Key Takeaways

  1. Instrument business logic, not just infrastructure — payment success rate matters more than CPU utilization.
  2. Correlate logs and traces — every log line should include trace_id for drill-down.
  3. Normalize metric labels/users/123 becomes /users/{id} to prevent cardinality explosion.
  4. Alert on symptoms — "payment success rate < 95%" is actionable; "CPU > 80%" is not.
  5. Use histograms for latency — p99 catches problems that averages hide.

The goal of observability isn't to collect data — it's to answer "why is this broken?" in under 5 minutes.


Based on production observability implementations for Python services handling 10K+ requests/second.

Top comments (0)