DEV Community

Alex
Alex

Posted on • Originally published at aleksul.space

I spent 2 years building the async Python task queue I wished existed

The Python web ecosystem is, genuinely, a great place to be. You want to build an API? You have FastAPI, which will generate you an OpenAPI spec from your type annotations before you finish your coffee and has become a de-facto standard. There are lots of other options as well: Litestar, Robyn, Quart - pick what you prefer. All feel modern and are a pleasure to use.

Then the business logic gets complicated, and you need to start running some of that work in the background. And at that point, it's no more unicorns and rainbows.

For many years, a common choice would be something like Celery or Dramatiq. But you get a whole new set of problems. You do not get automatic documentation: your message schemas live nowhere in particular. And since your application is async-first, you are now required to maintain a separate synchronous context for your background tasks. You end up writing two styles of Python in the same codebase, which does not feel right and is probably going to come back to bite you. The workers feel like a separate application that happens to share a repo.

It's crucial to understand what the performance bottlenecks of your program are, before you start to optimize. Most performance discussions in Python get stuck on the language speed question, which is often not the culprit. In many real-world use cases, slowness is external - API calls, database queries, disk I/O - and those wait times are orders of magnitude larger than any benefit you would get from switching to another "more performant" language. This is exactly why async is a required paradigm shift. And yet, despite all the obvious benefits, we are left with little to no choice when it comes to async task queues.

Mermaid diagram of async worker

It bothered me for a long time, in the specific and increasingly personal way that happens when you keep bumping into the same problem throughout your career. The async Python web ecosystem had made a decade of progress, yet background task tooling had been largely left behind.

So I decided to do something about it. This is the story about ups and downs, how I kept coming back to work on this project, frustrations, and discoveries that I've made along the way.

Async all the way

Repid started in 2021, when I needed a simple library that would let me perform large numbers of HTTP requests efficiently. I had Redis as my infrastructure backbone and needed something rapid. Combine two, and that is how Repid got its name. The idea was simple: decorate an async function, send it a message, have a worker pick it up. Async top-to-bottom, of course.

It quickly became clear I wanted something more mature than that first iteration.

Then I shipped v1: with RabbitMQ support, a middleware system, Pydantic-based argument serialization, and a concept called Jobs that handled not only basic messaging but also retries and delayed and scheduled execution. I deployed it at several of my projects and at my day job, where it processed millions of messages daily on a single worker.

Graph showing millions of messages being processed in a day

Measure twice, cut once

There was, however, a fundamental design flaw in Repid's v1 philosophy. In my attempt to provide utilities out-of-the-box, I made it opinionated about broker capabilities and ways of working.

The broker integrations were responsible for queue declaration, purging, deletion - concepts that make sense in RabbitMQ but that other brokers either do not have or handle completely differently. Every time I tried to add support for a new broker, I would hit this abstraction mismatch and stall.

The Jobs concept had a similar issue. Due to retries being treated as a first-class citizen, and backoff being obviously important part of that equation, v1 assumed that delayed messaging was a baseline capability that every broker integration had to provide. In practice, many brokers have a conceptually different design, where it makes no sense to delay a message. That assumption made it problematic to add support for new brokers.

Graph showing millions of messages being processed in a day

A common problem amongst task processing libraries is that they also define some sort of message structure. Repid v1 was no exception, although there was a mechanism to provide an override. Nevertheless, it was still something that I wanted to get rid of. In an ideal world I wanted to be able to consume messages from any other system, or to publish them in the format that they would understand.

The final straw was documentation. Admittedly, not the first thing that one thinks of, yet extremely important in the long run. I wanted it to be as simple and intuitive as OpenAPI. And right around that time AsyncAPI started to gain popularity.
It is a broker-agnostic spec, describes message schemas, channels and operations without caring about the underlying transport. Now all I had to do was to map it neatly to the Python definitions.

Example of AsyncAPI schema

So I scrapped v1's design and started over.

Understanding AsyncAPI

AsyncAPI comes with its own set of abstractions to describe a generic messaging system. At a high level: there are send and receive operations, servers, and channels. As you can imagine, the abstractions are necessarily loose, so a single term like 'channel' can refer to quite different things depending on the context. For example, in RabbitMQ a send operation's channel might refer to an exchange or a queue, while a receive operation's channel is always a queue. It took me a while to wrap my head around, but the model is simple once it clicks.

Another challenge was that for AsyncAPI 3 specifically, at least at the time, there were no Python bindings. So I got myself into a chicken and egg problem: to gain experience with AsyncAPI I needed bindings, and to get bindings I needed experience to understand what I was actually mapping. Thankfully, AsyncAPI also has json schema, so eventually I landed on using a modified version of datamodel-code-generator to convert it into Python TypedDicts. Fixing code-gen and cleaning up the resulting bindings afterwards took a significant chunk of time.

I started from scratch at least three times. I wasn't going to settle for anything but the perfect result. And yet there was one more thing that didn't let me sleep at night - sometimes literally.

AMQP improvements

There are some things you only learn once you've seen production for a while. You get paged at 2 AM, the queue size is growing, and the workers, while responding to healthchecks, aren't processing anything. In a cold sweat you restart them, and everything immediately comes back to life. Not understanding what just happened, you shrug your shoulders and go back to sleep, to investigate the next morning.

No mystery here - there was no bug. Just the fact that every broker that adopted AMQP 0.9.1 treated it more like a set of suggestions than a real specification. And to be honest, I don't blame them - after all, almost two decades have passed since its introduction, and some features are simply required. RabbitMQ in particular has made several extensions, including server-side cancel. The only problem - Python's async library, aiormq, doesn't export any hooks for when a connection gets abruptly terminated. But even if it did, AMQP 0.9.1's approach to recovery is simply "recreate everything," which isn't much help.

Comparatively, AMQP 1.0 is much more well-thought-out, strict, and standardized by OASIS. Most importantly, it describes flow control mechanisms and a recovery process, decoupling physical and protocol layers.

With RabbitMQ adding official AMQP 1.0 support in 4.0, I decided that now was the best time to make the switch. The only problem is that there are almost no async Python implementations of AMQP 1.0. So I decided to write my own.

I spent a long time reading the spec. I made at least five incomplete or broken attempts before getting to an implementation I was satisfied with. I was lucky enough to find several people's work on AMQP 1.0 frames, mapping them to Python, and that gave me a head start. Yet obviously the harder part was designing the connection and state management system.

What the design became

The framework should get out of the way. A user defines their message as a Python type. The framework inspects it at startup, generates an AsyncAPI schema, and serves it. The broker integration handles transport. That is it.
The Jobs concept is gone. Message structure is entirely defined by the user - no framework-injected parameters, no assumptions about what your broker can do. If your broker supports delayed delivery, you can use it. If it does not, you do not pretend it does.

import asyncio
from repid import Repid, Router, InMemoryServer, AsyncAPIServerSettings

# 1. Initialize the application and register a server
app = Repid()
app.servers.register_server("default", InMemoryServer(), is_default=True)

# 2. Create a router and an actor
router = Router()

@router.actor(channel="tasks")
async def my_awesome_actor(user_id: int) -> None:
    print(f"Processing for {user_id=}")
    await asyncio.sleep(1.0)
    print(f"Done processing for {user_id=}")

app.include_router(router)

async def main() -> None:
    # 3. Connect and run
    async with app.servers.default.connection():
        # Producer: Send a message
        await app.send_message_json(
            channel="tasks",
            payload={"user_id": 123},
            headers={"topic": "my_awesome_actor"}
        )

        # Consumer: Run the worker loop
        print("Starting worker loop...")
        await app.run_worker(
            messages_limit=1,
            # Serve AsyncAPI as a webpage while the worker is running
            asyncapi_server=AsyncAPIServerSettings(
                address="0.0.0.0",
                port=8081,
                endpoint_name="/"
            ),
        )
        print("Worker loop finished!")

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Repid v2

Repid v2 is out now. Here is what is in it:

  • AsyncAPI native - get your code documented automatically.
  • Broad broker support. v2 ships with in-house implementations for AMQP 1.0, GCP Pub/Sub, Amazon SQS, Redis Streams, Kafka, and NATS.
  • The fastest Python task queue I'm aware of. The benchmarks are in a separate repo if you want to verify.
  • And much more that you can find in the documentation.

Was it worth it?

v1 had modest adoption, which is just the reality of niche open source projects. The production use at my job mattered more to its development than anything else - it gave me the time to see how every design decision plays out.

Whether two years of evenings and weekends was worth it depends on whether anyone finds v2 useful. What I can say is that it is the library I actually wanted when I started. One where you write a function, sprinkle your types, and get a self-documenting async message consumer.

If that sounds useful, give it a try.

pip install repid
Enter fullscreen mode Exit fullscreen mode

Or with uv

uv add repid
Enter fullscreen mode Exit fullscreen mode

Docs are at repid.aleksul.space and the repo is at github.com/aleksul/repid. Give it a star!

Top comments (0)