Asynchronous tasks in Python with Celery + RabbitMQ + Redis
In this article, we are going to use Celery, RabbitMQ, and Redis to build a distributed Task queue.
But what is a distributed task queue, and why would you build one?
A distributed task queue allows you offload work to another process, to be handled asynchronously (once you push the work onto the queue, you don’t wait) and in parallel (you can use other cores to process the work).
So it basically it gives you the ability to execute tasks in the background while the application continues to resolve other tasks.
Use Cases of Task Queues
The most basic and understandable example would be sending emails after the user is registered. In this case, you don’t know how much time is it going to get to send the email to the user, it can be 1ms but it can be more, or sometimes even not sent at all, because, in these case scenarios, you are not responsible or simply said you’re not aware of the task is going to be successfully done, because it’s another provider who is going to do that for you.
So now that you got a simple idea of how you can benefit from the task queues, identifying such tasks is as simple as checking to see if they belong to one of the following categories:
Third-party tasks — The web app must serve users quickly without waiting for other actions to complete while the page loads, e.g., sending an email or notification or propagating updates to internal tools (such as gathering data for A/B testing or system logging).
Long-running jobs — Jobs that are expensive in resources, where users need to wait while they compute their results, e.g., complex workflow execution (DAG workflows), graph generation, Map-Reduce like tasks, and serving of media content (video, audio).
Periodic tasks — Jobs that you will schedule to run at a specific time or after an interval, e.g., monthly report generation or a web scraper that runs twice a day.
Setting up the dependencies for Celery
Celery requires a message transport to send and receive messages. Some candidates that you can use as a message broker are:
For this tutorial we are going to use RabbitMQ, you can use any other message broker that you want (ex. Redis).
It’s also good to mention for what are we going to use Redis
now since for the message transporter we are using RabbitMQ
.
When tasks are sent to the broker, and then executed by the celery worker, we want to save the state, and also to see which tasks have been executed before. For that, you’re going to need some kind of data-store and for this one, we are going to use Redis.
For the result stores we also have many candidates:
AMQP, Redis
Memcached,
SQLAlchemy, Django ORM
Apache Cassandra, Elasticsearch, Riak, etc
To set up these services we are going to use docker as it’s easy to set up, it’s isolated environment and you can easily reproduce the same environment when you have a configuration (Dockerfile or docker-compose).
Project setup
Let’s start a new python project from scratch. First let’s create a new directory, create all the files necessary for the project, and then initialize the virtual environment.
$ mkdir celery-python && cd $_
$ touch __init__.py
$ touch tasks.py
$ touch docker-compose.yaml
$ touch requirements.txt
# create & activate the virtualenv
$ python -m venv env
$ source env/Scripts/activate
Now let’s install the project requirements
. For this project, we are just going to need celery and Redis.
pip install celery redis
Now it’s time to configure docker-compose to run RabbitMQ and Redis. In the docker-compose.yaml paste the following YAML configuration.
Here we simply start up two services, by defining the image key to point to the image in dockerhub , mapping the ports host:docker and adding environment variables. To see what types of environment variables you can use with your image, you can simply go to the corresponding image in dockerhub, and see the documentation. For example you can check how to use RabbitMQ image here
Now, let’s initialize the celery app to use RabbitMQ as a message transporter and Redis as a result store.
In the tasks.py, let’s go ahead and paste the following code:
I tried to keep the code as minimal as possible, so you can understand the purpose of this tutorial.
As you can see, we have defined the URLs for RabbitMQ and Redis, and then we simply initialize the celery app using those configurations. The first parameter tasks is the name of the current module.
Then we have decorated the function say_hello with @app.task which tells that the function is marked as a task, and then can later be called using .delay() which we will see in a bit.
Normally we would have a module celery_app.py to only initialize the celery application instance, and then a separate moduletasks.py in which we would define the tasks that we want to run by celery.
Build and run services with docker
Now we only need to run the services (RabbitMQ and Redis) with docker. To run the images inside a container we simply run:
$ docker-compose up -d
This will take a while if you don’t have these images pulled locally. Then to verify that the containers are up and running we write:
$ docker ps
And you should see two services running, and additional information for each one, if not check the logs for any possible error.
Now let’s start the celery worker, and then let’s try to run some tasks with python interactive shell.
# Starting the Celery worker
$ celery -A tasks worker -l info --pool=solo
This will run celery worker, and if you see the logs it should tell that it has successfully connected with the broker.
Now let’s run a task.
# Running celery tasks
$ python
---------------------------------
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import say_hello
>>> say_hello.delay("Valon")
<AsyncResult: 55ad96a9-f7ea-44f4-9a47-e15b90d6d8a2>
We can see that we called the function using .delay() and then passing the name argument. This method is actually a star-argument shortcut to another method called apply_async()
. Then we see that we get <AsyncResult
back which is the task that was passed to the broker, and after that will get consumed and finished in background by celery.
If you look at your worker now, you will see in the logs that the worker received a task and then after 5 seconds will tell you that the task finished successfully.
Now let’s run the same task but let’s put the results store in the game now. In the python shell let’s store the result in a variable, and then lets its properties.
If we didn’t have the backend configured at the celery (Redis), we couldn’t access these properties or functions, because by default it wouldn’t store any state, but since we have it, we can see and get the pieces of information about our tasks. If you wanna dig deeper you can access your Redis database with a tool like table plus or you can set Flowerto monitor Redis and RabbitMQ.
As you can see on the image above, all the tasks are stored in redis.
Wrapping up
In this article we have set up a python application with Celery, RabbitMQ and Redis from scratch. The purpose of the article was to show you what is task queue, what can we benefit from it, and how to implement.
The examples of the task are just for demonstration, but you can use the same configuration as I did on this one, adding tasks in the tasks module and the configuration in celery_app.py. See the docs here
I highly encourage you to use celery in your application as it quite useful when you have things that take longer time, you need to schedule tasks, etc.
If you read the article and found it useful don’t forget to clap.
If you have any question, feel free to reach out to me.
Connect with me on 👉 LinkedIn, Github
Top comments (0)