Sommaire
Préambule TL;DR
Lorsque l'on développe des applications Web, il peut arriver que l'on doive exécuter des tâches qui dépassent le timeout du serveur Web : génération de gros fichiers, envoi de mails en masse, exécution d'une machine learning, traitement de fichiers importés, génération de rapports, ...tout un tas de cas d'utilisation qui arrivent régulièrement.
L'exécution de ce type de tâches est difficilement réalisable directement par le serveur d'applications Web, qui a un timeout de configuré et qui doit être réduit (quelques secondes) sous peine d'effondrement (j'exagère...un peu 🥵😁).
Nous allons voir, grâce à Django, Celery et Redis comment exécuter des tâches en différé, elles ne seront alors pas exécutées par le serveur d'application mais par un processus à part : un worker Celery.
Architecture
Dans ce contexte, les différentes briques utilisées seront :
- Django pour le serveur d'applications Web avec le module Celery : il enverra une tâche à exécuter en asynchrone
- Celery worker : les processus applicatifs qui exécuteront les tâches qui leur seront envoyées
- Redis : servira à être un message broker. En plus d'être un serveur de cache de données et/ou NoSql, Redis implémente le pattern de messages Pub/Sub : il y a des producteurs / clients (serveur d'applications) qui envoient des messages et des consommateurs de messages qui vont les lire (celery workers) : Celery django va envoyer l'ordre d'exécuter une tâche et un des workers va lire la tâche à exécuter et le faire. Le lancement des tâches est atomique, c'est à dire que la tâche ne va être prise que par un seul worker.
On peut représenter cette communication entre applications et processus avec ces différentes briques de la façon suivante :
(source : https://excalidraw.com/#json=MpX5kp_uYfL17ugUS6a-S,CgyUY0Aqanul0DCksVDzuA)
Un peu de code !
On va s'amuser à créer une tâche qui va attendre 5 secondes avant de nous dire qu'elle démarre, cette tâche ne sera pas exécuter dans le serveur d'applications mais dans le worker celery, en différé : le serveur d'applications rend la main immédiatement, la tâche s'exécute en temps voulu
Modules
Il nous faudra a minima comme modules, pour le côté client :
- django & djangorestframwork [pour une API de lancement d'une tâche]
- celery
- redis
Pour le côté serveurs :
- un serveur d'applications Web avec une API qui lance une tâche
- un worker qui réceptionnera notre tâche à exécuter
- un redis pour l'échange de messages entre le client Web et le worker Celery
Docker
4 images ont été préparées pour lancer 4 conteneurs : nginx pour le reverse proxy host -> conteneur, serveur d'application django, un worker celery, le message broker redis
au lancement de ces services via docker-compose up
on aura
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
6caf76fca92e api-and-worker "celery -A core.asyn…" 11 minutes ago Up 11 minutes 8100/tcp celery-worker
9cadb4c717a5 nginx "nginx -g 'daemon of…" 11 minutes ago Up 11 minutes 0.0.0.0:80->80/tcp nginx-api-server
b83db3cf05eb redis:7.0.8 "docker-entrypoint.s…" 11 minutes ago Up 11 minutes 0.0.0.0:6379->6379/tcp redis-broker
654b6d1222de api-and-worker "python3 manage.py r…" 11 minutes ago Up 11 minutes 8100/tcp api-server
avec le no-trunc
pour voir les paramètres des services lancés que l'on peut retrouver dans le docker-compose :
$ docker ps --no-trunc
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS
NAMES
6caf76fca92e5b4e83663019f34f57b5780fd3a9244d07daeb431f5f0e57b255 api-and-worker "celery -A core.async_tasks worker -l info" 11 minutes ago Up 11 minutes 8100/tcp
celery-worker
9cadb4c717a535d609dbd43ee2bb65667efd1b74f873e6e5b93efd7be365ab3a nginx "nginx -g 'daemon off;'" 11 minutes ago Up 11 minutes 0.0.0.0:80->80/tcp nginx-api-server
b83db3cf05eb7c9d9aaa4fb3ce120e98f2820f1040434d336ca024e0d1f0a5b6 redis:7.0.8 "docker-entrypoint.sh redis-server" 11 minutes ago Up 11 minutes 0.0.0.0:6379->6379/tcp redis-broker
654b6d1222de36b56f3e3d7bc9d13376ec7e54b527c316bbac7649576a82263f api-and-worker "python3 manage.py runserver 0.0.0.0:8110" 11 minutes ago Up 11 minutes 8100/tcp
api-server
Code
On aura :
- un module configuration celery : broker de message à utiliser, paramétrage Celery (tâches à prendre en compte, serialisation, timeout, etc)
- un module "tasks", commun à l'application et au worker sera nécessaire pour définir la tâche et l'appeler ensuite, une API qui déclenchera cette tâche
Déclaration de la tâche, on met une temporisation pour démontrer l'intérêt :
from time import sleep
import logging
logger = logging.getLogger('django')
from core.async_tasks.celery import app
@app.task
def execute_long_task(title: str, delay: int, wordings: list):
logger.info(f"** On s'endort {delay} secondes avant de lancer la tâche ZZzzzZZzzzzzzzzz **")
sleep(delay)
logger.info(f'** On se réveille ! Démarrage de la tâche {title} avec {wordings} **')
logger.info(f'** Terminée !')
API d'envoi et d'ordre d'exécution de la tâche:
class ExecuteTaskApi(APIView):
permission_classes = [AllowAny]
renderer_classes = (JSONRenderer,)
def get(self, request, *args, **kwargs):
execute_long_task.delay('Exécute ma longue tâche !', 5,
['Vais attendre', 'le super pouvoir de Celery'])
return Response({'data': 'je réponds immédiatement sans attendre la fin de la tâche !'})
Configuration de Celery
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings")
app = Celery("tasks")
app.config_from_object("core.async_tasks.celery_config")
# par défaut, il recherche les tâches dans le module "tasks.py" https://docs.celeryq.dev/en/stable/reference/celery.html#celery.Celery.autodiscover_tasks
app.autodiscover_tasks(['api'])
Logs de l'exécution de http://plateform/api/execute_task/ sur le navigateur ou par curl
$ curl http://plateform/api/execute_task/
{"data":"je réponds immédiatement sans attendre la fin de la tâche !"}
Le serveur d'application envoie l'ordre d'exécuter la tâche et rend immédiatement la main au navigateur, le worker réceptionne la tâche à exécuter, il y a une temporisation de 5 secondes pour démontrer l'aspect différé / asynchrone :
api-server | INFO basehttp "GET /api/execute_task/ HTTP/1.0" 200 73
celery-worker | [2023-03-12 11:19:00,851: INFO/MainProcess] Received task: api.tasks.execute_long_task[3f1b8339-4192-45da-9607-23246b532af7]
celery-worker | INFO tasks ** On s'endort 5 secondes avant de lancer la tâche ZZzzzZZzzzzzzzzz **
celery-worker | [2023-03-12 11:19:00,854: INFO/ForkPoolWorker-8] ** On s'endort 5 secondes avant de lancer la tâche ZZzzzZZzzzzzzzzz **
celery-worker | INFO tasks ** On se réveille ! Démarrage de la tâche Exécute ma longue tâche ! avec ['Vais attendre', 'le super pouvoir de Celery'] **
celery-worker | [2023-03-12 11:19:05,860: INFO/ForkPoolWorker-8] ** On se réveille ! Démarrage de la tâche Exécute ma longue tâche ! avec ['Vais attendre', 'le super pouvoir de Celery'] **
celery-worker | INFO tasks ** Terminée !
celery-worker | [2023-03-12 11:19:05,861: INFO/ForkPoolWorker-8] ** Terminée !
La tâche a duré 5 secondes, entre 11:19:00 et 11:19:05, indépendamment du serveur Web et de son timout !
On peut dès lors imaginer des tâches qui pourraient durer quelques minutes pour effectuer des traitements particuliers.
Sources
Celery permet d'autres paramétrages comme la reprise de tâche en cas d'échec ou de s'assurer qu'une tâche ne sera pas exécutée 2 fois, etc
Il existe aussi un outil de monitoring pour visualiser l'état des tâches : Flower
Ou encore, la possibilité d'avoir des tâches récurrentes et programmées, "à la crontab", grâce à Celery Beat.
Retrouvez les sources sur le Github : https://github.com/zorky/celery-redis-django-tasks
Top comments (0)