DEV Community

Cover image for Managing Asynchronous Work with Celery and Redis
Pedro Kauati
Pedro Kauati

Posted on

Managing Asynchronous Work with Celery and Redis

This is the sequel of my previous post, where I implemented a bug tracking web service with FastAPI. This time I'm gonna talk about the latest addition to the project: An async background job system and what it's being used for.

If you just want to see the completed project, feel free to give it a try, it's live! The source code can be found here.

Why Async?🧠

So after implementing the basic functionalities of the bug tracker, I wanted to get familiar with another important topic of back end development. What caught my attention was the idea of introducing concurrency. To put it simply, concurrency is the ability of a system to switch focus between different tasks. An operating system running on a single core CPU can achieve this, which is how you're able to run multiple programs even if they're not running in parallel.

Synchronous vs Asynchronous vs Parallel

And why is this useful for a web service? Trivial tasks may simply run one-per-one, without any hiccups. But add tasks with more processing or that depend on external services alongside several more client requests and your API will start to become slower, even for the trivial operations.

So concurrency allows our service to run non-trivial tasks on the background while it's still serving whatever else.

And what kind of task would that be? In my case, I decided to add a feature that project owners can request for a PDF report of their current projects.

Development💻

Stack update

  • Message Queue Manager: Celery🥕
  • MQ Data Store: Redis🟥
  • PDF Generator Library: FPDF📝
  • Blob Storage to host PDFs: MinIO️🗃️

Gotchas🌀

Celery was the beating heart of this operation. It took me a few tries to understand the system, specially since it involved running some code outside the FastAPI environment.

For starters, I got too used to FastAPI's hot reload, something that the worker container would not benefit from. So to ensure that I was working with the latest worker codebase I had to start running the containers with the --build flag.

I also had to struggle a bit with properly connecting the task methods to the celery app. What worked for me was calling the method autodiscover_tasks in the celery app object. This method takes a list of modules and allows the app to register tasks from files named tasks.py.

from celery import Celery
app = Celery(
    "celery_app", broker={REDIS_URL}, backend={REDIS_URL}
)
app.autodiscover_tasks(["app.worker"])
Enter fullscreen mode Exit fullscreen mode
#tasks.py
from .celery_app import app
@app.task(bind=True,retry_backoff=True,...)
def generate_report(self, job_id: str, user_id: str):
    ...
Enter fullscreen mode Exit fullscreen mode

Another challenging point was testing. I would say you currently have 3 ways to test a celery worker:

  • Use the pytest-celery library for asynchronous tests
  • Use celery.contrib.pytest also for asynchronous tests
  • Run synchronous tests by setting the flag task_always_eager to true

I found out to be a bit confusing to follow the documentation because I would often be unsure of which solution I was being reading about. In the end I opted for the synchronous tests, so the actual work done by the tasks could be asserted. This post goes into more detail on the trade off involved in each method.

When working with MinIO for data storage, I had to create two client connections, one private connection for communication between worker and storage, and a public one so the pdf files could be retrieved with a public facing url. Without that, the files could only be accessed via the docker network. An extra host had to be added to the minio container in order to make this work in dev.

Async Job System🕒

Job system workflow
Here's the workflow for this system:

  • When a request for a non-trivial task happens, the API creates a new job, saves its state in the database and dispatches a new task to celery, with the proper info. Job state is saved as QUEUED
  • Celery stores the task in the Redis container.
  • If a worker is available, it picks up the task from the queue, changes the job state to RUNNING
  • Worker performs the task, which in this case is to generate a pdf and upload to minIO. The pdf url is saved in the database as an artifact.
  • If successful, the job is marked as SUCCEEDED and it now points to the artifact.
  • If it fails due to a recoverable error, such as network connection, the task retries after a few seconds. The exponential backoff adds time to each attempt.
  • If it fails due to an unrecoverable error, the job is marked as FAILED
  • The API can check the status of jobs via the endpoint /jobs/result. If a job is complete, its result can be accessed.

With this system, more workers can easily be added and future features can offload resource intensive work to the task queue without much extra setup.

Conclusion💡

Coming from Game Development, I was used to concurrency in another context, involving coroutines and game objects, and applying this concept to a backend application deepened my understanding of it.

As demand grows, web services require smart solutions to remain responsive and reliable. That only made me more curious to see how other systems and architectures shape performance and development efficiency.

As for Flyswatter, I’ll keep polishing the codebase and improving test coverage, but when it comes to new features, I’m ready to move on to another project and dive deeper into system design. This one taught me a lot, and I can’t wait to see what comes next!

Top comments (0)