DEV Community

Cover image for OpenTelemetry Celery Instrumentation Guide
Alexandr Bandurchin for Uptrace

Posted on • Originally published at uptrace.dev

OpenTelemetry Celery Instrumentation Guide

OpenTelemetry enables comprehensive monitoring of Celery applications by automatically collecting telemetry data including task execution times, worker performance, queue depths, and error rates. By integrating OpenTelemetry, you can capture distributed traces across your task pipeline and export this data to observability backends for analysis and visualization.

What is Celery?

Celery is a distributed task queue for Python that allows you to run asynchronous and scheduled tasks. It's built on message passing and can operate with multiple brokers like Redis, RabbitMQ, and Amazon SQS. Celery is commonly used for background processing, periodic tasks, and distributed computing.

Celery consists of several components: producers (clients that send tasks), brokers (message transport), workers (processes that execute tasks), and result backends (stores for task results). This architecture makes it ideal for scaling applications horizontally and handling time-consuming operations without blocking user requests.

What is OpenTelemetry?

OpenTelemetry is an open-source observability framework that aims to standardize and simplify the collection, processing, and export of telemetry data from applications and systems.

OpenTelemetry supports multiple programming languages and platforms, making it suitable for a wide range of applications and environments. For detailed Python instrumentation, see the OpenTelemetry Python guide.

OpenTelemetry enables developers to instrument their code and collect telemetry data, which can then be exported to various OpenTelemetry backends or observability platforms for analysis and visualization. Using the OpenTelemetry Collector, you can centralize telemetry data collection, perform data transformations, and route data to multiple observability backends simultaneously.

Installation

To instrument a Celery application with OpenTelemetry, install the required packages:

pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation-celery
Enter fullscreen mode Exit fullscreen mode

Additional Instrumentation

Depending on your broker and result backend, you may also want to install additional instrumentation:

# For Redis broker/backend
pip install opentelemetry-instrumentation-redis

# For RabbitMQ (if using kombu)
pip install opentelemetry-instrumentation-kombu

# For database result backends
pip install opentelemetry-instrumentation-psycopg2  # PostgreSQL
pip install opentelemetry-instrumentation-sqlite3   # SQLite
Enter fullscreen mode Exit fullscreen mode

Exporter Installation

To export telemetry data to observability backends, install an appropriate exporter:

# For OTLP (recommended)
pip install opentelemetry-exporter-otlp

# For console output (development/testing)
pip install opentelemetry-exporter-otlp-proto-http
Enter fullscreen mode Exit fullscreen mode

Basic Instrumentation

Automatic Instrumentation

The simplest way to instrument Celery is using the worker process initialization hook. This ensures proper initialization in Celery's prefork worker model:

from celery import Celery
from celery.signals import worker_process_init
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
    ConsoleSpanExporter,
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

# Create Celery app first
app = Celery('tasks', broker='redis://localhost:6379/0')

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    """Initialize OpenTelemetry in each worker process"""

    # Configure OpenTelemetry
    resource = Resource(attributes={
        SERVICE_NAME: "celery-worker"
    })

    provider = TracerProvider(resource=resource)
    processor = BatchSpanProcessor(ConsoleSpanExporter())
    provider.add_span_processor(processor)
    trace.set_tracer_provider(provider)

    # Instrument Celery
    CeleryInstrumentor().instrument()

@app.task
def add(x, y):
    return x + y
Enter fullscreen mode Exit fullscreen mode

Manual Instrumentation

For more control over tracing, you can manually instrument specific tasks:

from celery import Celery
from celery.signals import worker_process_init
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

# Create Celery app first
app = Celery('tasks', broker='redis://localhost:6379/0')

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    """Initialize OpenTelemetry in each worker process"""
    resource = Resource(attributes={SERVICE_NAME: "celery-worker"})
    provider = TracerProvider(resource=resource)
    processor = BatchSpanProcessor(ConsoleSpanExporter())
    provider.add_span_processor(processor)
    trace.set_tracer_provider(provider)

    # Instrument Celery
    CeleryInstrumentor().instrument()

# Get tracer after initialization
tracer = trace.get_tracer(__name__)

@app.task
def process_data(data_id):
    with tracer.start_as_current_span("process_data") as span:
        # Add custom attributes
        span.set_attribute("data.id", data_id)
        span.set_attribute("worker.name", "data_processor")

        # Your task logic here
        result = expensive_computation(data_id)

        # Record result information
        span.set_attribute("result.size", len(result))
        span.set_attribute("task.status", "completed")

        return result
Enter fullscreen mode Exit fullscreen mode

Worker Configuration

Worker Startup with Instrumentation

Create a worker startup script that initializes OpenTelemetry properly:

# worker.py
import os
from celery import Celery
from celery.signals import worker_process_init
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

# Create Celery app
app = Celery('tasks', broker='redis://localhost:6379/0')

@worker_process_init.connect(weak=False)
def initialize_tracing(*args, **kwargs):
    """Initialize OpenTelemetry tracing for Celery worker"""

    # Create resource with worker information
    resource = Resource(attributes={
        SERVICE_NAME: "celery-worker",
        "service.version": "1.0.0",
        "deployment.environment": os.environ.get("ENVIRONMENT", "development"),
        "worker.hostname": os.environ.get("HOSTNAME", "unknown"),
    })

    # Configure tracer provider
    provider = TracerProvider(resource=resource)

    # Configure OTLP exporter
    otlp_exporter = OTLPSpanExporter(
        endpoint="http://localhost:4317",
        insecure=True,
    )

    processor = BatchSpanProcessor(otlp_exporter)
    provider.add_span_processor(processor)
    trace.set_tracer_provider(provider)

    # Instrument Celery
    CeleryInstrumentor().instrument()

@app.task
def example_task(data):
    # Your task logic here
    return f"Processed: {data}"
Enter fullscreen mode Exit fullscreen mode

Worker Execution

Start the worker with the instrumented configuration:

# Start worker with instrumentation
celery -A worker worker --loglevel=info

# For production with multiple workers
celery -A worker worker --loglevel=info --concurrency=4
Enter fullscreen mode Exit fullscreen mode

Producer (Client) Instrumentation

Instrument the client code that sends tasks to Celery:

# client.py
from celery import Celery
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

# Initialize OpenTelemetry for producer
resource = Resource(attributes={
    SERVICE_NAME: "celery-producer"
})

provider = TracerProvider(resource=resource)
otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
processor = BatchSpanProcessor(otlp_exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

# Instrument Celery
CeleryInstrumentor().instrument()

# Create Celery app
app = Celery('tasks', broker='redis://localhost:6379/0')

def send_task():
    # This will be traced automatically
    result = app.send_task('tasks.process_data', args=[123])
    return result
Enter fullscreen mode Exit fullscreen mode

Advanced Configuration

Custom Span Attributes

Add custom attributes to Celery spans for better observability:

from celery import Celery
from celery.signals import worker_process_init
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

# Create Celery app
app = Celery('tasks', broker='redis://localhost:6379/0')

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    """Initialize OpenTelemetry in each worker process"""
    resource = Resource(attributes={SERVICE_NAME: "celery-worker"})
    provider = TracerProvider(resource=resource)
    processor = BatchSpanProcessor(ConsoleSpanExporter())
    provider.add_span_processor(processor)
    trace.set_tracer_provider(provider)

    def custom_span_processor(span, task):
        """Custom function to add attributes to Celery spans"""

        # Add task-specific attributes
        span.set_attribute("celery.task.queue", task.request.delivery_info.get('routing_key', 'default'))
        span.set_attribute("celery.task.retries", task.request.retries)
        span.set_attribute("celery.task.eta", str(task.request.eta) if task.request.eta else "immediate")

        # Add worker information
        span.set_attribute("celery.worker.hostname", task.request.hostname)

        # Add custom business logic attributes
        if hasattr(task.request, 'correlation_id'):
            span.set_attribute("business.correlation_id", task.request.correlation_id)

    # Configure instrumentation with custom processor
    CeleryInstrumentor().instrument(
        span_name_callback=lambda task: f"celery.task.{task.name}",
        span_processor_callback=custom_span_processor
    )
Enter fullscreen mode Exit fullscreen mode

Error Handling and Exception Tracking

Enhance error tracking in Celery tasks:

from celery import Celery
from celery.signals import worker_process_init
from celery.exceptions import Retry
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

# Create Celery app
app = Celery('tasks', broker='redis://localhost:6379/0')

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    """Initialize OpenTelemetry in each worker process"""
    resource = Resource(attributes={SERVICE_NAME: "celery-worker"})
    provider = TracerProvider(resource=resource)
    processor = BatchSpanProcessor(ConsoleSpanExporter())
    provider.add_span_processor(processor)
    trace.set_tracer_provider(provider)
    CeleryInstrumentor().instrument()

@app.task(bind=True)
def risky_task(self, data):
    span = trace.get_current_span()

    try:
        # Your task logic here
        result = process_risky_operation(data)

        # Set success attributes
        span.set_attribute("task.result.success", True)
        span.set_attribute("task.result.items_processed", len(result))

        return result

    except RetryableError as e:
        # Handle retryable errors
        span.record_exception(e)
        span.set_attribute("task.retry.attempt", self.request.retries + 1)
        span.set_attribute("task.retry.reason", str(e))

        # Retry with exponential backoff
        raise self.retry(exc=e, countdown=60, max_retries=3)

    except Exception as e:
        # Handle permanent failures
        span.record_exception(e)
        span.set_status(Status(StatusCode.ERROR, str(e)))
        span.set_attribute("task.error.type", type(e).__name__)
        span.set_attribute("task.error.fatal", True)

        raise
Enter fullscreen mode Exit fullscreen mode

Task Result Tracking

Track task results and completion status:

from celery.signals import task_success, task_failure, task_retry
from opentelemetry import trace

@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):
    """Handle successful task completion"""
    span = trace.get_current_span()
    span.set_attribute("celery.task.status", "success")
    span.set_attribute("celery.task.result.type", type(result).__name__)

@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwargs):
    """Handle task failures"""
    span = trace.get_current_span()
    span.set_attribute("celery.task.status", "failure")
    span.set_attribute("celery.task.exception", str(exception))
    span.record_exception(exception)

@task_retry.connect
def task_retry_handler(sender=None, task_id=None, reason=None, einfo=None, **kwargs):
    """Handle task retries"""
    span = trace.get_current_span()
    span.set_attribute("celery.task.status", "retry")
    span.set_attribute("celery.task.retry.reason", str(reason))
Enter fullscreen mode Exit fullscreen mode

Broker-Specific Configuration

Redis Configuration

For Redis broker, add Redis instrumentation:

from celery import Celery
from celery.signals import worker_process_init
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

# Create Celery app with Redis configuration
app = Celery('tasks')
app.conf.update(
    broker_url='redis://localhost:6379/0',
    result_backend='redis://localhost:6379/0',
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
)

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    """Initialize OpenTelemetry in each worker process"""
    resource = Resource(attributes={SERVICE_NAME: "celery-worker"})
    provider = TracerProvider(resource=resource)
    processor = BatchSpanProcessor(ConsoleSpanExporter())
    provider.add_span_processor(processor)
    trace.set_tracer_provider(provider)

    # Instrument Redis
    RedisInstrumentor().instrument()

    # Instrument Celery
    CeleryInstrumentor().instrument()
Enter fullscreen mode Exit fullscreen mode

RabbitMQ Configuration

For RabbitMQ broker, configure with appropriate instrumentation:

from celery import Celery
from celery.signals import worker_process_init
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

# Create Celery app with RabbitMQ configuration
app = Celery('tasks')
app.conf.update(
    broker_url='pyamqp://guest@localhost//',
    result_backend='rpc://',
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
)

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
    """Initialize OpenTelemetry in each worker process"""
    resource = Resource(attributes={SERVICE_NAME: "celery-worker"})
    provider = TracerProvider(resource=resource)
    processor = BatchSpanProcessor(ConsoleSpanExporter())
    provider.add_span_processor(processor)
    trace.set_tracer_provider(provider)

    # Instrument Celery (includes kombu instrumentation)
    CeleryInstrumentor().instrument()
Enter fullscreen mode Exit fullscreen mode

Monitoring and Metrics

Custom Metrics Collection

Collect custom metrics alongside traces:

from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter

# Configure metrics
metric_reader = PeriodicExportingMetricReader(
    OTLPMetricExporter(endpoint="http://localhost:4317", insecure=True),
    export_interval_millis=30000,
)
metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader]))

# Create meter and instruments
meter = metrics.get_meter("celery_metrics")
task_duration_histogram = meter.create_histogram(
    "celery.task.duration",
    description="Duration of Celery task execution",
    unit="ms"
)
task_counter = meter.create_counter(
    "celery.tasks.total",
    description="Total number of Celery tasks"
)

@app.task
def monitored_task(data):
    start_time = time.time()

    try:
        # Your task logic
        result = process_data(data)

        # Record metrics
        duration = (time.time() - start_time) * 1000
        task_duration_histogram.record(duration, {"task_name": "monitored_task", "status": "success"})
        task_counter.add(1, {"task_name": "monitored_task", "status": "success"})

        return result

    except Exception as e:
        # Record failure metrics
        duration = (time.time() - start_time) * 1000
        task_duration_histogram.record(duration, {"task_name": "monitored_task", "status": "error"})
        task_counter.add(1, {"task_name": "monitored_task", "status": "error"})
        raise
Enter fullscreen mode Exit fullscreen mode

Production Deployment

Environment Configuration

Use environment variables for production configuration:

import os
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

def configure_tracing():
    """Configure OpenTelemetry based on environment variables"""

    # OTLP endpoint configuration
    otlp_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")

    # Service configuration
    service_name = os.environ.get("OTEL_SERVICE_NAME", "celery-worker")
    service_version = os.environ.get("OTEL_SERVICE_VERSION", "1.0.0")
    environment = os.environ.get("DEPLOYMENT_ENVIRONMENT", "production")

    # Resource attributes
    resource = Resource(attributes={
        SERVICE_NAME: service_name,
        "service.version": service_version,
        "deployment.environment": environment,
    })

    # Configure exporter
    exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
    processor = BatchSpanProcessor(exporter)

    provider = TracerProvider(resource=resource)
    provider.add_span_processor(processor)
    trace.set_tracer_provider(provider)
Enter fullscreen mode Exit fullscreen mode

Docker Configuration

Example Dockerfile for containerized Celery workers:

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

# Set OpenTelemetry environment variables
ENV OTEL_SERVICE_NAME=celery-worker
ENV OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4317

# Start worker with instrumentation
CMD ["celery", "-A", "worker", "worker", "--loglevel=info"]
Enter fullscreen mode Exit fullscreen mode

What is Uptrace?

Uptrace is a OpenTelemetry APM that supports distributed tracing, metrics, and logs. You can use it to monitor applications and troubleshoot issues. Compare with other top APM tools for task queue monitoring.

Uptrace Overview

Uptrace comes with an intuitive query builder, rich dashboards, alerting rules with notifications, and integrations for most languages and frameworks.

Uptrace can process billions of spans and metrics on a single server and allows you to monitor your applications at 10x lower cost.

In just a few minutes, you can try Uptrace by visiting the cloud demo (no login required) or running it locally with Docker. The source code is available on GitHub.

uptrace-python with Celery

For simplified configuration, you can use the uptrace-python wrapper:

Installation

pip install uptrace
Enter fullscreen mode Exit fullscreen mode

Configuration

import uptrace
from opentelemetry.instrumentation.celery import CeleryInstrumentor

# Configure OpenTelemetry with Uptrace
uptrace.configure_opentelemetry(
    # Set DSN or use UPTRACE_DSN environment variable
    dsn="<your-uptrace-dsn>",
    service_name="celery-worker",
    service_version="1.0.0",
    deployment_environment="production",
)

# Instrument Celery
CeleryInstrumentor().instrument()

# Your Celery app
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
Enter fullscreen mode Exit fullscreen mode

Environment Variables

# Uptrace configuration
export UPTRACE_DSN="https://<token>@uptrace.dev/<project_id>"

# OpenTelemetry configuration
export OTEL_SERVICE_NAME="celery-worker"
export OTEL_SERVICE_VERSION="1.0.0"

# Start worker
celery -A tasks worker --loglevel=info
Enter fullscreen mode Exit fullscreen mode

Troubleshooting

Common Issues

  1. Double instrumentation: Ensure you only call CeleryInstrumentor().instrument() once per worker process
  2. Missing broker traces: Install appropriate broker instrumentation (Redis/RabbitMQ)
  3. Worker startup issues: Initialize OpenTelemetry using worker_process_init hook, not at module level
  4. Span not appearing: Check that exporters are configured correctly and tracer provider is set
  5. High overhead: Adjust sampling rates and batch processor settings
  6. Fork-safety issues: Use worker process initialization hook to avoid sharing connections between processes

Debug Configuration

Enable debug logging to troubleshoot instrumentation:

import logging

# Enable OpenTelemetry debug logging
logging.getLogger("opentelemetry").setLevel(logging.DEBUG)

# Enable Celery debug logging
logging.getLogger("celery").setLevel(logging.DEBUG)

# Configure root logger
logging.basicConfig(level=logging.DEBUG)
Enter fullscreen mode Exit fullscreen mode

Performance Optimization

Configure appropriate settings for production:

from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased

# Configure sampling (sample 10% of traces)
sampler = TraceIdRatioBased(0.1)
provider = TracerProvider(sampler=sampler)

# Optimize batch processor
processor = BatchSpanProcessor(
    exporter,
    max_queue_size=2048,
    schedule_delay_millis=5000,
    max_export_batch_size=512,
)
Enter fullscreen mode Exit fullscreen mode

What's next?

By integrating OpenTelemetry with Celery, you gain valuable insights into your distributed task processing pipeline. You can monitor task performance, track queue depths, identify bottlenecks, and troubleshoot issues across your asynchronous workflow.

The telemetry data collected helps you:

  • Monitor task execution times and success rates
  • Track worker performance and resource utilization
  • Identify bottlenecks in task processing pipelines
  • Debug failed tasks and retry patterns
  • Optimize queue management and worker scaling
  • Understand task dependencies and data flow
  • Improve overall system reliability and performance

Top comments (0)