Introduction
A few days ago, I was working on this cool personal project of mine (no spoilers till I release it). I needed to work on a microservice that read image data from Kafka and perform operations on them. These operations involved heavy image processing and machine learning tasks that could take about 1 second to finish. Sounds pretty small right? Here's the problem. These messages coming from Kafka were arriving in real-time. Imagine getting ready to spend 1 second on one image when the other has already arrived.
One solution I considered (which I will still implement) was to horizontally scale this microservice since I'm already using a Publisher/Subscriber architecture with Kafka. It means that I have multiple instances of this microservice subscribed to Kafka, so while one is busy with an image, the other has already picked it up and is working on it. But this wasn't enough. I needed this microservice to be as scalable as possible. That's when it hit me. Why don't I use Celery?
What is Celery?
I needed some way to receive these messages as quickly as possible and perform these computations on each image. That's where Celery came in. Celery is a Python-based task queue that helps us queue up tasks in a separate thread allowing the main process of our app to run smoothly. In other words, it's an asynchronous task queue. These tasks get executed in the order that they arrive.
Why Celery?
Now, I know many of you are probably wondering, why Celery? Why didn't Samuel use multithreading? After all, he has written an article on how to use multithreading and multiprocessing in Python.
The answer is the queue in the task queue. Celery allows us to use robust message brokers such as RabbitMQ and Redis making our lives better. Celery then lets us create Workers. You can think of these workers as threads that keep checking the message brokers for jobs to execute.
Even if our celery workers aren't running, the jobs get queued until a worker gets back up. Also, since we can spawn multiple celery workers connected to the same RabbitMQ or Redis Queue, we've achieved horizontal scalability within each instance of my awesome microservice.
Using Celery in Python
If you're still reading this, you'd also like to use Celery in your project. To use Celery, you only need four things installed on your computer.
- A message broker (RabbitMQ, Redis, Amazon SQS, or Zookeeper)
- Python3 installed
- The Celery Framework
- A celery backend (optional) for queuing up the results from our celery workers. The best choice here is Redis.
The first thing you'd want to do is to get a message broker installed on your computer or server. I recommend RabbitMQ because it's the default message broker for Celery and should work best, but you should use a message broker that you're most comfortable with. Here are some links on how to install these brokers
I would assume you already have Python3 installed on your computer so I won't say so much about it. I'd also recommend you create a virtual environment rather than using your global Python environment.
Lastly, you can install Celery using your favorite package manager in Python (pip, easyinstall, and lots more)
The first step to using Celery is to create a Celery object in a file called tasks.py (it's just a convention. You can name it anything you like). This object tells Python how we'd like our Celery worker to behave. This is where we specify what message broker Celery should use.
from celery import Celery
app = Celery(
'tasker',
backend='redis://localhost',
broker='amqp://127.0.0.1:5672',
)
The next step is to copy our computationally expensive code and paste it into a function in our tasks.py file. You'll then decorate this function with the @app.task decorator from our Celery object. This decorator tells Celery that the code is a possible task in Celery.
import time
@app.task
def awesome_task(time: int):
# We pretend as if the task is taking some time.
time.sleep(time)
logging.info(f"Time taken: {time}")
import random
from tasks import awesome_task
while True:
# We create a new task and fire it to celery
awesome_task.delay(random.randint(1, 3))
We can then add tasks to our Celery task queue that can be picked up by any of the Celery workers and executed.
To create a Celery worker, run the command in the terminal.
celery -A tasks worker --loglevel=INFO
When we run our app, you will notice that we aren't seeing the output in the app's shell but rather, in the shell that's running the Celery task.
Conclusion
I hope I've been able to show you how awesome Celery is at helping scale your apps.
Of course, this isn't the only way you can use Celery. You can use Celery to make your Flask or Django app asynchronous. This means that the requests made to your app don't have to wait for it to be less busy to get a response. A lot of people also use Celery to run background tasks like sending successful E-mails or storing application logs.
If you have any questions about using Celery in Python, feel free to reach out to me through my E-mail or the comment section.
Top comments (0)