Distributed task queues and message brokers are integral components of modern, scalable applications, helping to manage workloads and facilitate communication between different parts of a system. They help us decouple task submission from execution, i.e., they allow applications to submit tasks without worrying about when or where they will be executed. This separation enhances scalability and reliability, as the system can distribute tasks across various workers based on current load and availability, and execute them at the most opportune time.
In addition to being a super fast data store, Redis can also be used as a message broker for distributed task queues such as Celery.
Here’s how Redis and Celery interact in a distributed setup:
- Task Submission: Services submit tasks to Celery with details and parameters.
- Redis as Message Broker: Once a task is submitted, Celery uses Redis as the message broker to store this task. Redis holds these tasks in a queue-like structure, waiting for a worker to pick them up.
- Task Queuing: Tasks wait in Redis until workers are ready, with queues organized by priority or type.
- Worker Nodes: Celery workers continuously monitor the Redis queues for new tasks. They pick up tasks from Redis, distributed across various machines for scalability.
- Task Execution: With Redis handling the task messages, Celery workers can focus on executing them. We can scale by running more workers or even distributing these workers across multiple machines, with Redis efficiently managing the task queue for all these workers. Workers process tasks, performing computations or interactions with databases or other components as needed, then return results.
- Fault Tolerance and Retry Logic: Celery and Redis ensure no task is lost, retrying failed tasks and maintaining persistence.
This article aims to provide a comprehensive understanding of Redis as a message broker for Celery. We will develop a straightforward backend service using Django, designed to enqueue email sending tasks for Celery workers.
Deep Dive into Redis and Celery Integration
Code used in this article is hosted on Github (nileshprasad137/django-celery-redis-setup) but in order to understand this setup in detail, let's walkthrough some crucial steps to setup our Django backend to simulate sending emails asynchronously.
Setting up Django backend with Redis and Celery
- Dockerise setup
We will dockerise our setup to install all the dependencies of our project includind Redis and Celery. Ensure docker.compose file have all the services defined.
- Configure Django for Redis and Celery
Ensure your Django settings are configured to use Redis as the broker and backend for Celery.
settings.py
CELERY_BROKER_URL = 'redis://redis:6379/0'
CELERY_RESULT_BACKEND = 'redis://redis:6379/0'
CELERY_TASK_DEFAULT_QUEUE = 'default'
CELERY_TASK_QUEUES = {
'low_priority': {
'exchange': 'low_priority', # unused
'routing_key': 'low_priority',
},
'high_priority': {
'exchange': 'high_priority', # unused
'routing_key': 'high_priority',
},
'default': {
'exchange': 'default',
'routing_key': 'default'
},
}
Also, create a celery.py
file next to settings.py
. This file will contain the Celery instance and configuration:
celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_redis.settings')
app = Celery("django_celery_redis", broker=settings.CELERY_BROKER_URL) # App for all consumer facing tasks
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
- Define tasks
Django is where tasks are defined. Here is a simple example of task in django for simulating sending an email.
@app.task
def send_email_simulation(email_address, subject, message):
print(f"Sending email to {email_address} | Subject: {subject}")
# Here, you would implement your email sending logic.
time.sleep(2)
return f"Email sent to {email_address} with subject {subject}"
We can then use Docker Compose to build and run your containers:
docker-compose up --build
This will start the Django development server, a Redis instance, and a Celery worker, all in separate containers.
By following these steps, we've successfully set up a Dockerized Django backend that integrates Redis as a message broker and Celery for task queue management, with Poetry for dependency handling. Our Django application is now ready to handle background tasks.
Note, this is a development setup and production setup will need some changes in order to make this secure.
Understanding Redis Internals in the Context of Celery
Lets start to delve deeper into redis data structures to understand how redis is acting as message broker.
On spinning our containers, celery tasks are autodiscovered.
Let's see what all keys are present in redis just when we spin up our container.
Here's a brief explanation of each of the keys we're seeing in your Redis instance when used with Celery:
-
_kombu.binding.celeryev
:
This key is related to Celery Events (hence the "ev" in the name), which are special messages used for monitoring tasks within Celery.
-
_kombu.binding.high_priority
/_kombu.binding.default
/_kombu.binding.low_priority
:
These particular keys represent the routing information for different queues: "high_priority," "default," and "low_priority" respectively. They define how messages (or tasks) are routed to these queues based on their bindings. Essentially, they are part of the configuration that determines where messages should be sent within the broker (Redis), helping Celery to organize and prioritize tasks by directing them to the appropriate queue based on their intended priority or category.
-
_kombu.binding.celery.pidbox
: The pidbox is a special mailbox used by Celery for control and monitoring purposes (like inspecting worker stats or controlling worker processes).
In all cases, these _kombu.binding.*
keys are part of how Kombu (the messaging library used by Celery) manages message routing and queues with Redis. They are not the queues themselves but the definitions of how messages should be routed to the appropriate queues based on their bindings. In Celery, the actual task messages are stored in Redis lists, and these bindings help ensure the messages are delivered to the correct list representing a queue.
Now, lets add a task through Django and see which keys gets added to Redis.
>>> # Trigger a low priority email
>>> send_email_simulation.apply_async(args=["user@example.com", "Hello", "This is a test email"], queue='low_priority')
<AsyncResult: 5ddc21b8-6832-4e58-83bb-c71da6a60916>
This task 5ddc21b8-6832-4e58-83bb-c71da6a60916
will first get added to low_priority
redis list which will then be picked up celery worker. Once celery workers have finished executing the task, another meta key will be added in Redis.
127.0.0.1:6379> KEYS *
1) "_kombu.binding.celeryev"
2) "_kombu.binding.high_priority"
3) "_kombu.binding.celery.pidbox"
4) "celery-task-meta-5ddc21b8-6832-4e58-83bb-c71da6a60916"
5) "_kombu.binding.default"
6) "_kombu.binding.low_priority"
127.0.0.1:6379> TYPE celery-task-meta-5ddc21b8-6832-4e58-83bb-c71da6a60916
string
127.0.0.1:6379> GET celery-task-meta-5ddc21b8-6832-4e58-83bb-c71da6a60916
"{\"status\": \"SUCCESS\", \"result\": \"Email sent to user@example.com with subject Hello\", \"traceback\": null, \"children\": [], \"date_done\": \"2024-01-06T07:50:04.800480\", \"task_id\": \"5ddc21b8-6832-4e58-83bb-c71da6a60916\"}"
127.0.0.1:6379>
The keys that you're seeing in Redis starting with celery-task-meta-
are the result of storing task results. These are not the tasks themselves, but metadata about the tasks that have been run, assuming you have task result backend configured to use Redis. These metadata keys store the information about task status, result, and traceback if there was an error.
Let's also try to check if our tasks get added to queues as list. If we have workers running and they are consuming tasks faster than we're enqueuing them, we might not see a build-up of tasks in Redis, as they're being taken off the queue and processed immediately. So to prevent it, we will queue up 1000 tasks at once so that we could see our tasks in list.
>>> for i in range(1000):
... send_email_simulation.apply_async(args=["user@example.com", "Hello", "This is a test email"], queue='low_priority')
...
<AsyncResult: 7efab9a0-efa2-4898-9043-17965fd3d26b>
<AsyncResult: 57fa1346-6240-4bfb-9e90-a98a59552f5b>
.....
Let's see if low_priority key is present in redis after task has been sent to queues.
127.0.0.1:6379> KEYS *priority
1) "_kombu.binding.high_priority"
2) "low_priority"
3) "_kombu.binding.low_priority"
Here we go! Lets now check our first 3 entries of this list low_priority
Here's a brief rundown of how the task got added to low_priority:
Task Dispatch: A task is dispatched in your Django application with an indication that it should be routed to the low_priority queue. This can be done through the apply_async method or by setting the default queue for the task.
Serialization: The task's data, including arguments, execution options, and metadata, is serialized into a message. This is what we see as a long string in the Redis list. It's typically a base64-encoded representation of the task's information.
Push to Queue: Celery pushes this serialized message to the Redis list representing the low_priority queue. In Redis, this queue is simply a list, and new tasks are appended to the end.
Task Message Structure: The message contains detailed information, such as the task's unique ID, arguments, callbacks, retries, and more. When a Celery worker is ready to process tasks from the low_priority queue, it pops a message from this list, deserializes it, and executes the task.
At any point of time, we could also get waiting tasks to be picked up by celery workers,
127.0.0.1:6379> type unacked
hash
127.0.0.1:6379> hlen unacked
(integer) 4
We could see its contents using HGETALL
.
Queue priority handling in Redis
Redis as a broker simplifies the queueing mechanism and doesn't use exchanges and routing keys like RabbitMQ and does not inherently support priority queueing but with careful application design, especially in conjunction with Celery, you can effectively manage task priorities. Here's how you can handle queue priorities using Redis as a broker for Celery:
Multiple Queues for Different Priorities: The most straightforward way to implement priority handling in Redis with Celery is by defining multiple queues for different priority levels, such as high_priority, medium_priority, and low_priority. You dispatch tasks to these queues based on how urgent they are.
Worker Queue Subscriptions: Workers can subscribe to multiple queues and prioritize them. A worker can listen to both high_priority and low_priority queues but check for tasks in high_priority first. This is typically managed by the order of queues provided when starting the worker:
celery -A proj worker -l info -Q high_priority,low_priority
This tells the worker to consume tasks from high_priority first, then medium_priority, and finally low_priority.
Understanding Exchanges in Celery with Redis
In messaging systems, an exchange is a concept that routes messages from producers to the appropriate queues based on various criteria. While this concept is central to more complex brokers like RabbitMQ, Redis, as used with Celery, does not inherently support the notion of exchanges due to its simpler, key-value store nature. However, understanding how Celery simulates or bypasses this functionality when paired with Redis is useful.
Simplified Routing: When using Redis as a broker with Celery, the exchange concept is simplified. Instead of a formal exchange routing messages to different queues based on bindings and rules, Celery directly pushes task messages to the specific Redis list representing a queue.
Default Exchange: Celery with Redis uses a default direct exchange type. In direct exchange, messages are routed to the queues with the name exactly matching the routing key of the message. Since Redis doesn't have a built-in exchange mechanism, the routing is handled internally by Celery.
Alternate Brokers and Comparison
Redis, with its straightforward setup and performance efficiency, serves well as a broker for many standard use cases, particularly when it's already in use for caching and pub/sub scenarios. Unlike advanced AMQP brokers like RabbitMQ, Redis does not offer complex features such as exchanges and routing keys, yet its simplicity often translates to cost-effectiveness and ease of management. The choice between Redis and more sophisticated brokers like RabbitMQ hinges on the specific demands of your application - considering task complexity, reliability needs, and scalability. For high-throughput, complex routing, or extensive scalability requirements, RabbitMQ or similar brokers might be more suitable, while Redis excels in scenarios valuing speed and simplicity.
Monitoring and Scalability
Using tools like Flower for real-time monitoring of Celery workers and tasks, and integrating Prometheus or Grafana for in-depth metrics and alerts, provides the visibility needed for robust operation. Scalability can be achieved by horizontally adding more Celery workers; for example, an e-commerce platform might increase from 10 to 50 workers during peak sales periods or a social media app might use auto-scaling to adjust worker count based on user activity.
Conclusion
In this article, we explored how Redis works as a message broker with Celery, looking into how it handles tasks and organizes them. We shed light on the journey of a task from start to finish, emphasizing Redis's essential role in making task management smooth and effective.
The supporting repository for this article is hosted at nileshprasad137/django-celery-redis-setup which can be used for further tinkering.
Happy coding!
Top comments (0)