DEV Community

Cover image for Celery & Redis : exécution de tâches en différé / asynchrones
DUVAL Olivier
DUVAL Olivier

Posted on • Updated on

Celery & Redis : exécution de tâches en différé / asynchrones

Sommaire

Préambule TL;DR

Lorsque l'on développe des applications Web, il peut arriver que l'on doive exécuter des tâches qui dépassent le timeout du serveur Web : génération de gros fichiers, envoi de mails en masse, exécution d'une machine learning, traitement de fichiers importés, génération de rapports, ...tout un tas de cas d'utilisation qui arrivent régulièrement.

L'exécution de ce type de tâches est difficilement réalisable directement par le serveur d'applications Web, qui a un timeout de configuré et qui doit être réduit (quelques secondes) sous peine d'effondrement (j'exagère...un peu 🥵😁).

Nous allons voir, grâce à Django, Celery et Redis comment exécuter des tâches en différé, elles ne seront alors pas exécutées par le serveur d'application mais par un processus à part : un worker Celery.

Architecture

Dans ce contexte, les différentes briques utilisées seront :

  • Django pour le serveur d'applications Web avec le module Celery : il enverra une tâche à exécuter en asynchrone
  • Celery worker : les processus applicatifs qui exécuteront les tâches qui leur seront envoyées
  • Redis : servira à être un message broker. En plus d'être un serveur de cache de données et/ou NoSql, Redis implémente le pattern de messages Pub/Sub : il y a des producteurs / clients (serveur d'applications) qui envoient des messages et des consommateurs de messages qui vont les lire (celery workers) : Celery django va envoyer l'ordre d'exécuter une tâche et un des workers va lire la tâche à exécuter et le faire. Le lancement des tâches est atomique, c'est à dire que la tâche ne va être prise que par un seul worker.

On peut représenter cette communication entre applications et processus avec ces différentes briques de la façon suivante :

Image description

(source : https://excalidraw.com/#json=MpX5kp_uYfL17ugUS6a-S,CgyUY0Aqanul0DCksVDzuA)

Un peu de code !

On va s'amuser à créer une tâche qui va attendre 5 secondes avant de nous dire qu'elle démarre, cette tâche ne sera pas exécuter dans le serveur d'applications mais dans le worker celery, en différé : le serveur d'applications rend la main immédiatement, la tâche s'exécute en temps voulu

Modules

Il nous faudra a minima comme modules, pour le côté client :

  • django & djangorestframwork [pour une API de lancement d'une tâche]
  • celery
  • redis

Pour le côté serveurs :

  • un serveur d'applications Web avec une API qui lance une tâche
  • un worker qui réceptionnera notre tâche à exécuter
  • un redis pour l'échange de messages entre le client Web et le worker Celery

Docker

4 images ont été préparées pour lancer 4 conteneurs : nginx pour le reverse proxy host -> conteneur, serveur d'application django, un worker celery, le message broker redis

au lancement de ces services via docker-compose up on aura



$ docker ps
CONTAINER ID   IMAGE                    COMMAND                  CREATED          STATUS          PORTS                                            NAMES
6caf76fca92e   api-and-worker           "celery -A core.asyn…"   11 minutes ago   Up 11 minutes   8100/tcp                                         celery-worker
9cadb4c717a5   nginx                    "nginx -g 'daemon of…"   11 minutes ago   Up 11 minutes   0.0.0.0:80->80/tcp                               nginx-api-server
b83db3cf05eb   redis:7.0.8              "docker-entrypoint.s…"   11 minutes ago   Up 11 minutes   0.0.0.0:6379->6379/tcp                           redis-broker
654b6d1222de   api-and-worker           "python3 manage.py r…"   11 minutes ago   Up 11 minutes   8100/tcp                                         api-server


Enter fullscreen mode Exit fullscreen mode

avec le no-trunc pour voir les paramètres des services lancés que l'on peut retrouver dans le docker-compose :



$ docker ps --no-trunc
CONTAINER ID                                                       IMAGE                    COMMAND                                       CREATED          STATUS          PORTS
                               NAMES
6caf76fca92e5b4e83663019f34f57b5780fd3a9244d07daeb431f5f0e57b255   api-and-worker           "celery -A core.async_tasks worker -l info"   11 minutes ago   Up 11 minutes   8100/tcp
                               celery-worker
9cadb4c717a535d609dbd43ee2bb65667efd1b74f873e6e5b93efd7be365ab3a   nginx                    "nginx -g 'daemon off;'"                      11 minutes ago   Up 11 minutes   0.0.0.0:80->80/tcp                               nginx-api-server
b83db3cf05eb7c9d9aaa4fb3ce120e98f2820f1040434d336ca024e0d1f0a5b6   redis:7.0.8              "docker-entrypoint.sh redis-server"           11 minutes ago   Up 11 minutes   0.0.0.0:6379->6379/tcp                           redis-broker
654b6d1222de36b56f3e3d7bc9d13376ec7e54b527c316bbac7649576a82263f   api-and-worker           "python3 manage.py runserver 0.0.0.0:8110"    11 minutes ago   Up 11 minutes   8100/tcp
                               api-server


Enter fullscreen mode Exit fullscreen mode

Code

On aura :

  • un module configuration celery : broker de message à utiliser, paramétrage Celery (tâches à prendre en compte, serialisation, timeout, etc)
  • un module "tasks", commun à l'application et au worker sera nécessaire pour définir la tâche et l'appeler ensuite, une API qui déclenchera cette tâche

Déclaration de la tâche, on met une temporisation pour démontrer l'intérêt :



from time import sleep
import logging
logger = logging.getLogger('django')
from core.async_tasks.celery import app

@app.task
def execute_long_task(title: str, delay: int, wordings: list):
    logger.info(f"** On s'endort {delay} secondes avant de lancer la tâche ZZzzzZZzzzzzzzzz **")

    sleep(delay)

    logger.info(f'** On se réveille ! Démarrage de la tâche {title} avec {wordings} **')
    logger.info(f'** Terminée !')


Enter fullscreen mode Exit fullscreen mode

API d'envoi et d'ordre d'exécution de la tâche:



class ExecuteTaskApi(APIView):
    permission_classes = [AllowAny]
    renderer_classes = (JSONRenderer,)

    def get(self, request, *args, **kwargs):
        execute_long_task.delay('Exécute ma longue tâche !', 5,
                                ['Vais attendre', 'le super pouvoir de Celery'])
        return Response({'data': 'je réponds immédiatement sans attendre la fin de la tâche !'})


Enter fullscreen mode Exit fullscreen mode

Configuration de Celery



import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings")

app = Celery("tasks")
app.config_from_object("core.async_tasks.celery_config")

# par défaut, il recherche les tâches dans le module "tasks.py" https://docs.celeryq.dev/en/stable/reference/celery.html#celery.Celery.autodiscover_tasks
app.autodiscover_tasks(['api'])


Enter fullscreen mode Exit fullscreen mode

Logs de l'exécution de http://plateform/api/execute_task/ sur le navigateur ou par curl



$ curl http://plateform/api/execute_task/
{"data":"je réponds immédiatement sans attendre la fin de la tâche !"}


Enter fullscreen mode Exit fullscreen mode

Le serveur d'application envoie l'ordre d'exécuter la tâche et rend immédiatement la main au navigateur, le worker réceptionne la tâche à exécuter, il y a une temporisation de 5 secondes pour démontrer l'aspect différé / asynchrone :



api-server        | INFO basehttp "GET /api/execute_task/ HTTP/1.0" 200 73
celery-worker     | [2023-03-12 11:19:00,851: INFO/MainProcess] Received task: api.tasks.execute_long_task[3f1b8339-4192-45da-9607-23246b532af7]
celery-worker     | INFO tasks ** On s'endort 5 secondes avant de lancer la tâche ZZzzzZZzzzzzzzzz **
celery-worker     | [2023-03-12 11:19:00,854: INFO/ForkPoolWorker-8] ** On s'endort 5 secondes avant de lancer la tâche ZZzzzZZzzzzzzzzz **
celery-worker     | INFO tasks ** On se réveille ! Démarrage de la tâche Exécute ma longue tâche ! avec ['Vais attendre', 'le super pouvoir de Celery'] **
celery-worker     | [2023-03-12 11:19:05,860: INFO/ForkPoolWorker-8] ** On se réveille ! Démarrage de la tâche Exécute ma longue tâche ! avec ['Vais attendre', 'le super pouvoir de Celery'] **
celery-worker     | INFO tasks ** Terminée !
celery-worker     | [2023-03-12 11:19:05,861: INFO/ForkPoolWorker-8] ** Terminée !



Enter fullscreen mode Exit fullscreen mode

La tâche a duré 5 secondes, entre 11:19:00 et 11:19:05, indépendamment du serveur Web et de son timout !

On peut dès lors imaginer des tâches qui pourraient durer quelques minutes pour effectuer des traitements particuliers.

Sources

Celery permet d'autres paramétrages comme la reprise de tâche en cas d'échec ou de s'assurer qu'une tâche ne sera pas exécutée 2 fois, etc

Il existe aussi un outil de monitoring pour visualiser l'état des tâches : Flower

Ou encore, la possibilité d'avoir des tâches récurrentes et programmées, "à la crontab", grâce à Celery Beat.

Retrouvez les sources sur le Github : https://github.com/zorky/celery-redis-django-tasks

Top comments (0)