DEV Community

Jayanth
Jayanth

Posted on

Distributed tracing across FastAPI and Celery with OpenTelemetry the part nobody shows you

Every OpenTelemetry tutorial ends at "add the middleware to FastAPI." That gets
you traces for HTTP requests. It tells you nothing about what happens inside
your Celery workers.
The hard part is propagating trace context from your FastAPI handler into the
Celery task, so a single request trace shows the full chain: HTTP → queue →
worker → database.
Here's the exact setup I use in production.
Why context propagation breaks by default
When FastAPI handles a request, the OTEL SDK creates a span and stores the
trace context in Python's contextvars. When you call .delay() to queue a
Celery task, that context is not automatically passed — Celery serialises
the task arguments to JSON and ships them to Redis. The trace context lives
in memory, not in the message. The worker starts with no idea what trace
it belongs to.
The fix: manually inject the trace context into the task headers, then
extract it in the worker before creating child spans.

Step 1: Inject context when queuing

from opentelemetry import propagate
from opentelemetry.trace import get_current_span

def queue_task_with_context(payload: dict):
    carrier = {}
    propagate.inject(carrier)  # pulls current trace context into dict

    process_order.apply_async(
        kwargs={"payload": payload},
        headers={"otel_context": carrier},
    )
Enter fullscreen mode Exit fullscreen mode

Step 2: Extract context in the worker

from celery.signals import task_prerun
from opentelemetry import propagate, trace

tracer = trace.get_tracer(__name__)

@task_prerun.connect
def on_task_prerun(task_id, task, args, kwargs, **kw):
    headers = task.request.get("otel_context", {})
    ctx = propagate.extract(headers)  # restore parent trace context

    span = tracer.start_span(
        name=f"celery.{task.name}",
        context=ctx,
        kind=trace.SpanKind.CONSUMER,
    )
    task.request.otel_span = span
    task.request.otel_token = trace.use_span(span, end_on_exit=False).__enter__()
Enter fullscreen mode Exit fullscreen mode
from celery.signals import task_postrun, task_failure

@task_postrun.connect
def on_task_postrun(task, **kw):
    span = getattr(task.request, "otel_span", None)
    if span:
        span.set_status(trace.StatusCode.OK)
        span.end()

@task_failure.connect
def on_task_failure(task, exception, **kw):
    span = getattr(task.request, "otel_span", None)
    if span:
        span.record_exception(exception)
        span.set_status(trace.StatusCode.ERROR)
        span.end()
Enter fullscreen mode Exit fullscreen mode
# SDK setup in your FastAPI app
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor

exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317", insecure=True)
provider.add_span_processor(BatchSpanProcessor(exporter))
Enter fullscreen mode Exit fullscreen mode

What you get
One trace ID that follows a request from the HTTP layer through the queue
into the Celery worker and down into your database query. When something
breaks at 2 AM, you open Grafana, search by trace ID, and see exactly
where it failed and how long each step took.
Without this, you have logs from three different services with no way to
connect them. With this, you have one timeline.
The full runnable repo with docker-compose, FastAPI app, Celery worker,
and Grafana Tempo is at github.com/itsjayanth/observability-starter-kit.

Top comments (0)