DEV Community

Zach
Zach

Posted on • Originally published at circumeo.io

Monitoring Celery in Production

How can you determine if your Celery queues are healthy or overwhelmed with tasks?

Anything from sudden surges of user traffic to abuse of your app can drag down your Celery cluster. Knowing when this is happening is half the battle.

Our goal in this series is to build automated monitoring tools for Celery so that you don't have to find out about problems from your users.

Monitoring Queue Wait Times

There are many valuable metrics to monitor such as queue length, processing time, failed task count, etc.

But we're going to start with what I think is the most effective metric to monitor: queue wait times.

Queue wait time is simply how long a task spends waiting in the queue before a worker accepts it and begins processing.

Queue length is important, but harder to judge. What constitutes too many tasks in the queue? For one service, having 10 items in the queue might be too many, while 100,000 is perfectly normal for another service.

It's also easy to have short bursts where the queue grows, but as long as the workers process them quickly enough, is it really an issue? This can lead to false alarms.

Queue wait times, on the other hand, are a true reflection of your user's experience.

Let's say you have a Django app that auto-tunes videos of people singing. You also implemented a helpful progress bar. A user uploads a video of themselves singing a Beyonce song, but the progress bar doesn't budge for several minutes. That user gives up, and tells all their friends that the app doesn't work.

What happened? Tasks may have still been processing, but the queue wait time was too high.

Setting Up the Example Django App

Let's build a very simple Django app to demonstrate how to monitor Celery queue wait times.

We'll use Docker to make it simpler to set up Celery and Redis.

Create the Django project.

django-admin startproject djautotune
cd djautotune
python3 manage.py startapp core
Enter fullscreen mode Exit fullscreen mode

Update the djautotune/settings.py file to add the core application. We'll also add the Celery related settings while we're editing the file.

INSTALLED_APPS = [
    "django.contrib.admin",
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.messages",
    "django.contrib.staticfiles",
    "core",
]

REDIS_HOST = os.environ["REDIS_HOST"]
REDIS_PORT = os.environ["REDIS_PORT"]

CELERY_METRICS_TTL = int(os.environ.get("CELERY_METRICS_TTL", 300))

CELERY_IMPORTS = ("core.tasks",)
CELERY_BROKER_URL = os.environ["CELERY_BROKER_URL"]
CELERY_RESULT_BACKEND = os.environ["CELERY_RESULT_BACKEND"]
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
Enter fullscreen mode Exit fullscreen mode

Adding the Celery Worker Entrypoint

Create a new file named celery.py in the same directory as the settings.py file. This file acts as the entrypoint for the Celery worker.

import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djautotune.settings")

app = Celery("djautotune")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
Enter fullscreen mode Exit fullscreen mode

Implementing the Background Task Logic

Now create the core/tasks.py file to contain our task logic.

Since actually auto-tuning videos is beyond the scope of this tutorial, we'll add some dummy code that simulates the workload.

import random
import time

from celery import shared_task

@shared_task
def autotune():
    short_duration = random.uniform(0.5, 2)  # Short duration for most tasks
    medium_duration = random.uniform(2, 5)  # Medium duration for 10% of tasks
    long_duration = random.uniform(5, 10)  # Long duration for 1% of tasks (P99)

    probability = random.random()

    if probability < 0.90:
        time_to_sleep = short_duration
    elif probability < 0.99:
        time_to_sleep = medium_duration
    else:
        time_to_sleep = long_duration

    time.sleep(time_to_sleep)
Enter fullscreen mode Exit fullscreen mode

Creating a View to Invoke the Background Task

We'll need a way to invoke this new autotune Celery task. Open core/views.py to add the following HTTP endpoint.

from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from .tasks import autotune

@csrf_exempt
def enqueue_autotune_task(request):
    if request.method == "POST":
        result = autotune.delay()
        return JsonResponse(
            {"task_id": result.task_id, "status": "Task submitted successfully!"},
            status=200,
        )
    else:
        return JsonResponse({"error": "Only POST method is allowed."}, status=405)
Enter fullscreen mode Exit fullscreen mode

Now, as usual, you'll need to add the new route to your Django router config.

To start, add the following in the core/urls.py file.

from django.urls import path
from . import views

urlpatterns = [
    path("autotune/", views.enqueue_autotune_task, name="enqueue_autotune_task"),
]
Enter fullscreen mode Exit fullscreen mode

Finally, update the main urls.py file to include the routes from the core app.

from django.contrib import admin
from django.urls import include, path

urlpatterns = [
    path("admin/", admin.site.urls),
    path("", include("core.urls")),
]
Enter fullscreen mode Exit fullscreen mode

Setting Up the Docker Development Environment

Create the requirements.txt file.

Django==4.2.4
redis==4.5.5
celery==5.2.7
Enter fullscreen mode Exit fullscreen mode

Create the Dockerfile for the Django app in the root project directory.

FROM python:3.11

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

WORKDIR /usr/src/app

COPY requirements.txt /usr/src/app/

RUN pip install --no-cache-dir -r requirements.txt

COPY . /usr/src/app/

RUN mkdir -p /usr/src/app/
Enter fullscreen mode Exit fullscreen mode

Now create the docker-compose.yml to manage the containers.

version: '3.8'

services:
  web:
    build:
      context: .
      dockerfile: Dockerfile
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ${PWD}:/usr/src/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
      - CELERY_METRICS_TTL=300
      - REDIS_HOST=redis
      - REDIS_PORT=6379
    ports:
      - "8000:8000"
    depends_on:
      - redis

  celery:
    build:
      context: .
      dockerfile: Dockerfile
    command: celery -A djautotune worker --loglevel=info
    volumes:
      - ${PWD}:/usr/src/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
      - CELERY_METRICS_TTL=300
      - REDIS_HOST=redis
      - REDIS_PORT=6379
    depends_on:
      - redis

  redis:
    image: redis:latest
    ports:
      - "6379:6379"
Enter fullscreen mode Exit fullscreen mode

Starting Up and Testing the App

You should be ready to start the app now with the docker-compose up command.

If all goes well, you can enqueue a task with the following curl command.

curl -X POST http://localhost:8000/autotune/
Enter fullscreen mode Exit fullscreen mode

Using the docker logs command on the Celery container, you should be able to see a success message like this for the task.

Task core.tasks.autotune[9cdfcb7d-1f65-49cc-aa33-20c694157e61] succeeded in 1.9952244879677892s: None
Enter fullscreen mode Exit fullscreen mode

Adding the Celery Monitoring Logic

We have a functioning background task, but no monitoring to speak of.

To add monitoring, we'll create a Django view that returns the longest queue wait time in the past 30 seconds. The window of time can be easily adjusted, but it's important that we use a sliding window of some length.

We'll use a Redis sorted set with an expiration to create the sliding window of queue wait times.

Marking New Tasks with a Start Timestamp

The first step is to mark each new task with a timestamp.

This timestamp represents the moment the task was created and published, but before any worker has picked up the task.

Open core/apps.py and add the following.

import time
from django.apps import AppConfig
from celery.signals import before_task_publish

def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs):
    headers["__metadata__"] = {"publish_timestamp": time.time()}

class CoreConfig(AppConfig):
    default_auto_field = "django.db.models.BigAutoField"
    name = "core"

    def ready(self):
        before_task_publish.connect(before_task_publish_handler)
Enter fullscreen mode Exit fullscreen mode

Recording Queue Wait Time in Redis

Once a worker has picked up the task, we need to calculate the elapsed time and store it in Redis.

Since we're using the task_prerun signal, this represents the moment the task was picked up, and does not include actual processing time.

Open djautotune/celery.py and add the following.

import time
import os
import redis

from celery import Celery
from celery.signals import task_prerun
from django.conf import settings

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djautotune.settings")

app = Celery("djautotune")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

redis_client = redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)

@task_prerun.connect
def task_prerun_handler(task_id, task, *args, **kwargs):
    metadata = getattr(task.request, "__metadata__", {})
    publish_timestamp = metadata.get("publish_timestamp")

    if publish_timestamp:
        elapsed_time = time.time() - publish_timestamp

        redis_client.zadd("task_elapsed_times", {task_id: elapsed_time})
        redis_client.expire("task_elapsed_times", settings.CELERY_METRICS_TTL, nx=True)
Enter fullscreen mode Exit fullscreen mode

The zadd call adds the queue wait time to the sorted set. The set is sorted by the queue wait time, so it's efficient to request the greatest value, which will be important when we add a view to ask for this value.

Because the expire call is passed nx=True, only the initial call sets the expiration. In other words, even though expire is called for each task, the expiration is not reset. Eventually, the key task_elapsed_times will expire, and an entirely new sorted set is created with a new expiration.

Requesting the Longest Queue Wait Time

For the final piece of the puzzle, let's create a view to return the longest queue wait in the current time window (as defined by the CELERY_METRICS_TTL setting).

Open core/views.py and add the following view code.

import redis

from django.conf import settings
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from .tasks import autotune

def max_queue_wait_time(request):
    redis_client = redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)

    max_time_data = redis_client.zrevrange("task_elapsed_times", 0, 0, withscores=True)
    max_elapsed_time = max_time_data[0][1] if max_time_data else 0

    return JsonResponse({"max_elapsed_time": max_elapsed_time})
Enter fullscreen mode Exit fullscreen mode

Setting Up Alerting for Celery

We'll use Cronitor to set up alerting so that we receive a notification when queue wait times become too high.

To do this, Cronitor will make requests to our max_queue_wait_time endpoint and parse the JSON response.

This means that Cronitor must have an endpoint that it can reach. Normally, we can't do that when developing on a personal machine. For this tutorial, however, we can use ngrok to establish a tunnel to our local Django application for testing purposes.

Using Ngrok to Establish a Tunnel

Head over to ngrok and sign up for a free account.

Download the CLI tool, enter your token, and open a tunnel with the following command.

ngrok http 8000
Enter fullscreen mode Exit fullscreen mode

Copy the domain that is listed next to the Forwarding column.

Adding the Celery Check in Cronitor

Sign up for a free account with the Cronitor service.

Add a new Check with details similar to the following. Remember to add your own Ngrok domain.

Take note of the ngrok-skip-browser-warning header and be sure to add that.

Under the assertions section, set up the following so that Cronitor understands when to trigger an alert.

This means that Cronitor will fire an alert when max_elapsed_time exceeds 10 seconds. You can of course adjust this time to whatever metric suits your application.

Here's an example of what happened when I added a huge number of tasks so that my Celery queue became overwhelmed.

Further Celery Monitoring Enhancements

You've learned how to monitor queue wait time, a metric that directly reflects what your users are experiencing.

This is a great start, but there's a lot more to monitoring your service comprehensively.

As a sneak peek, in the next article we'll learn how to monitor more Celery stats, such as failed tasks and retries. We'll also dive into building a custom Celery monitoring dashboard using Bokeh.

You can see the entire djautotune sample project on GitHub if you'd like to copy some of the code as a template.

Top comments (0)