DEV Community

Cover image for Python CQRS: Building distributed systems without the pain (Sagas, Outbox, Event-Driven)
Vadim Kozyrevskii
Vadim Kozyrevskii

Posted on

Python CQRS: Building distributed systems without the pain (Sagas, Outbox, Event-Driven)

Building distributed systems in Python? Here is how python-cqrs tackles consistency with orchestrated sagas, the mediator pattern, and a transactional outbox—without preaching theory for ten pages first.

TL;DR

  • Commands and queries stay in plain handlers: nothing in the handler depends on HTTP, Kafka, or CLI.
  • Sagas: persisted state, automatic compensation, recovery after crashes; see the docs for fallback and circuit-style options.
  • Transactional outbox: integration events in the same DB transaction as writes (at-least-once to the broker). In-process domain events are a different contract (see the table at the end of the cheat sheet).

Links: GitHub · Docs · PyPI python-cqrs


A story in three crashes

  1. A user places an order. Payment succeeds.
  2. A downstream inventory service is supposed to reserve stock—but the process dies (OOM, redeploy, node loss).
  3. The warehouse never heard about the order, yet money moved.

That is the boring side of distributed systems: not “microservices are trendy,” but consistency when a commit in one place does not magically align the rest of the world.

Hi, I am Vadim, tech ex-lead of the financial infrastructure team at Timeweb Cloud. This post is a short intro to the library we use under our Python microservices—no epic “how we got here”. If you are tired of reinventing transactional outbox, sagas, and mediator wiring, this may save you time.

python-cqrs is a framework for CQRS and event-driven architecture. It grew out of a fork of diator and has since become its own stack for reliable service design.


See it in code (before the feature list)

Idea: one command → one handler → mediator.send. Everything else (saga, outbox, Kafka) reuses the same wiring via bootstrap.

import di
import cqrs
from cqrs.requests import bootstrap

# 1) Contract: what goes in / what comes out (CQRS: this is a “write” path).
class CreateUserCommand(cqrs.Request):
    email: str
    name: str

class CreateUserResponse(cqrs.Response):
    user_id: str

# 2) Pure handler: no FastAPI, no Kafka—only business logic.
class CreateUserHandler(cqrs.RequestHandler[CreateUserCommand, CreateUserResponse]):
    async def handle(self, request: CreateUserCommand) -> CreateUserResponse:
        user_id = f"user_{request.email}"
        return CreateUserResponse(user_id=user_id)

# 3) Mediator: map command type → handler, resolve via DI.
mediator = bootstrap.bootstrap(
    di_container=di.Container(),
    commands_mapper=lambda m: m.bind(CreateUserCommand, CreateUserHandler),
)

result = await mediator.send(
    CreateUserCommand(email="user@example.com", name="John"),
)
Enter fullscreen mode Exit fullscreen mode

More wiring options: Bootstrap. Below: one minimal snippet per feature; comments carry the point.


Feature cheat sheet (code)

Query (read model)

Idea: same RequestMediator and send—only the handler returns a read DTO instead of mutating state.

import cqrs
import di
from cqrs.requests import bootstrap

class GetUserByEmail(cqrs.Request):
    email: str

class UserDto(cqrs.Response):
    user_id: str
    name: str

class GetUserHandler(cqrs.RequestHandler[GetUserByEmail, UserDto]):
    async def handle(self, request: GetUserByEmail) -> UserDto:
        return UserDto(user_id="u1", name="Ada")  # e.g. DB / cache read

mediator = bootstrap.bootstrap(
    di_container=di.Container(),
    commands_mapper=lambda m: m.bind(GetUserByEmail, GetUserHandler),
)
user = await mediator.send(GetUserByEmail(email="a@b.c"))
Enter fullscreen mode Exit fullscreen mode

Domain events (in-process)

Idea: the command emits DomainEvents; the mediator dispatches them to EventHandlers in the same turn—fast and in-memory, not a public integration contract.

import cqrs
import di
from cqrs.requests import bootstrap

class CreateOrder(cqrs.Request):
    order_id: str

class OrderLineAdded(cqrs.DomainEvent):
    order_id: str
    sku: str

class OrderCreated(cqrs.Response):
    pass

class CreateOrderHandler(cqrs.RequestHandler[CreateOrder, OrderCreated]):
    def __init__(self) -> None:
        self._events: list[cqrs.Event] = []

    @property
    def events(self) -> list[cqrs.Event]:
        return self._events

    async def handle(self, request: CreateOrder) -> OrderCreated:
        self._events.append(
            OrderLineAdded(order_id=request.order_id, sku="sku-1"),
        )
        return OrderCreated()

class OnOrderLineAdded(cqrs.EventHandler[OrderLineAdded]):
    async def handle(self, event: OrderLineAdded) -> None:
        return  # e.g. update projection; if process dies, effect may be lost (at-most-once)

def events_mapper(m: cqrs.EventMap) -> None:
    m.bind(OrderLineAdded, OnOrderLineAdded)

mediator = bootstrap.bootstrap(
    di_container=di.Container(),
    commands_mapper=lambda m: m.bind(CreateOrder, CreateOrderHandler),
    domain_events_mapper=events_mapper,
    max_concurrent_event_handlers=4,  # cap parallel domain handlers
)
Enter fullscreen mode Exit fullscreen mode

OutboxedEventMap (integration / durable)

Idea: register the wire name and payload type for NotificationEvent once; the outbox table stores the same event in the DB transaction as your aggregate—no “committed in DB, never reached Kafka” gap. Details: Transactional Outbox.

import cqrs
from pydantic import BaseModel

class UserJoinedPayload(BaseModel, frozen=True):
    user_id: str
    meeting_id: str

cqrs.OutboxedEventMap.register(
    "user_joined",  # topic / routing key name used when publishing
    cqrs.NotificationEvent[UserJoinedPayload],
)
# In a command handler (same transaction as INSERT): append NotificationEvent to outbox repo.
Enter fullscreen mode Exit fullscreen mode

Outbox → broker (publisher)

Idea: a separate loop reads outbox rows, publishes to Kafka, then commits—at-least-once delivery to the broker (consumers should be idempotent).

import asyncio
import cqrs
from cqrs.message_brokers import kafka
from cqrs.adapters import kafka as kafka_adapters

broker = kafka.KafkaMessageBroker(
    producer=kafka_adapters.kafka_producer_factory(dsn="localhost:9092"),
)
producer = cqrs.EventProducer(
    message_broker=broker,
    repository=outbox_repository,  # must implement the outbox repository protocol
)

async def publish_loop() -> None:
    async for events in producer.event_batch_generator():
        for event in events:
            await producer.send_message(event)  # push to the broker
        await producer.repository.commit()  # only after successful publish
        await asyncio.sleep(10)  # throttle polling (tune in production)
Enter fullscreen mode Exit fullscreen mode

Orchestrated saga (compensation + async for steps)

Idea: act along ordered steps; on failure, run compensate in reverse. Saga storage and a log let you resume after a crash. Full API: Saga.

import dataclasses
import uuid
import di
import cqrs
from cqrs.saga import bootstrap
from cqrs.saga.saga import Saga
from cqrs.saga.step import SagaStepHandler, SagaStepResult
from cqrs.saga.storage.memory import MemorySagaStorage
from cqrs.saga.models import SagaContext
from cqrs.response import Response

@dataclasses.dataclass
class OrderContext(SagaContext):
    order_id: str
    items: list[str]
    total_amount: float
    reservation_id: str | None = None  # shared mutable context between steps

class ReserveInventoryStep(SagaStepHandler[OrderContext, Response]):
    def __init__(self, inventory) -> None:  # inject a port (interface) via DI
        self._inventory = inventory

    async def act(self, context: OrderContext) -> SagaStepResult:
        context.reservation_id = await self._inventory.reserve(
            context.order_id, context.items,
        )
        return self._generate_step_result(Response())

    async def compensate(self, context: OrderContext) -> None:
        if context.reservation_id:
            await self._inventory.release(context.reservation_id)  # undo act()

class OrderSaga(Saga[OrderContext]):
    steps = [ReserveInventoryStep]

def saga_mapper(m: cqrs.SagaMap) -> None:
    m.bind(OrderContext, OrderSaga)

mediator = bootstrap.bootstrap(
    di_container=di.Container(),  # register `inventory` for ReserveInventoryStep
    sagas_mapper=saga_mapper,
    saga_storage=MemorySagaStorage(),  # or SQLAlchemy storage in prod
)

async def run_saga() -> None:
    ctx = OrderContext(order_id="o1", items=["a", "b"], total_amount=10.0)
    # mediator.stream is not await — it returns an async iterator of step results
    async for step in mediator.stream(context=ctx, saga_id=uuid.uuid4()):
        pass  # forward to logs, metrics, or SSE
Enter fullscreen mode Exit fullscreen mode

StreamingRequestMediator (progress / SSE)

Idea: yield partial Response chunks; the consumer uses mediator.stream(...) to drive progress (e.g. over SSE). From Streaming mediator.

import typing
import asyncio
import di
import cqrs
from cqrs.requests import bootstrap
from cqrs.requests.request_handler import StreamingRequestHandler
from cqrs.message_brokers import devnull

class ProcessFilesCommand(cqrs.Request):
    file_ids: list[str]

class FileResult(cqrs.Response):
    file_id: str
    status: str

class FileDone(cqrs.DomainEvent):
    file_id: str

class OnFileDone(cqrs.EventHandler[FileDone]):
    async def handle(self, event: FileDone) -> None:
        return

def domain_events(m: cqrs.EventMap) -> None:
    m.bind(FileDone, OnFileDone)

class ProcessFilesHandler(StreamingRequestHandler[ProcessFilesCommand, FileResult]):
    def __init__(self) -> None:
        self._events: list[cqrs.Event] = []

    @property
    def events(self) -> list[cqrs.Event]:
        return self._events

    async def handle(
        self, request: ProcessFilesCommand,
    ) -> typing.AsyncIterator[FileResult]:
        for file_id in request.file_ids:
            await asyncio.sleep(0.01)  # simulate work
            self._events.append(FileDone(file_id=file_id))
            yield FileResult(file_id=file_id, status="ok")  # one chunk per file

mediator = bootstrap.bootstrap_streaming(
    di_container=di.Container(),
    commands_mapper=lambda m: m.bind(ProcessFilesCommand, ProcessFilesHandler),
    domain_events_mapper=domain_events,
    message_broker=devnull.DevnullMessageBroker(),  # swap for real broker if you publish from here
    max_concurrent_event_handlers=5,
)

async def run_stream() -> None:
    async for chunk in mediator.stream(
        ProcessFilesCommand(file_ids=["1", "2", "3"]),
    ):
        if chunk:
            print(chunk.file_id, chunk.status)
Enter fullscreen mode Exit fullscreen mode

Chain of responsibility

Idea: the first handler that matches returns a result; others call await self.next(request). Register the ordered chain in RequestMap (see Chain of Responsibility).

import cqrs
from cqrs.requests.cor_request_handler import CORRequestHandler

class PayCmd(cqrs.Request):
    amount: float
    method: str
    user_id: str

class PayResult(cqrs.Response):
    ok: bool
    msg: str = ""

class CardHandler(CORRequestHandler[PayCmd, PayResult]):
    @property
    def events(self) -> list[cqrs.Event]:
        return []

    async def handle(self, request: PayCmd) -> PayResult | None:
        if request.method == "card":
            return PayResult(ok=True, msg="card")
        return await self.next(request)  # delegate down the chain

class DefaultPay(CORRequestHandler[PayCmd, PayResult]):
    @property
    def events(self) -> list[cqrs.Event]:
        return []

    async def handle(self, request: PayCmd) -> PayResult | None:
        return PayResult(ok=False, msg=f"unsupported: {request.method}")
Enter fullscreen mode Exit fullscreen mode

EventMediator + FastStream (consume integration events)

Idea: Kafka delivers a NotificationEvent; you forward it to EventMediator.send—same handler types as in-process mapping, different transport. FastStream.

import functools
import di
import cqrs
import faststream
import pydantic
from cqrs.events import bootstrap
from faststream import kafka

class UserCreatedPayload(pydantic.BaseModel):
    user_id: str

class OnUserCreated(cqrs.EventHandler[cqrs.NotificationEvent[UserCreatedPayload]]):
    async def handle(
        self, e: cqrs.NotificationEvent[UserCreatedPayload],
    ) -> None:
        _ = e.payload.user_id  # your integration logic

@functools.lru_cache(maxsize=1)  # one mediator per process
def mediator_factory() -> cqrs.EventMediator:
    def em(m: cqrs.EventMap) -> None:
        m.bind(
            cqrs.NotificationEvent[UserCreatedPayload],
            OnUserCreated,
        )

    return bootstrap.bootstrap(di_container=di.Container(), events_mapper=em)

broker = kafka.KafkaBroker(bootstrap_servers=["localhost:9092"])
app = faststream.FastStream(broker)

@broker.subscriber("user_events", group_id="svc", auto_commit=False)
async def on_message(
    body: cqrs.NotificationEvent[UserCreatedPayload],
    msg: kafka.KafkaMessage,
    mediator: cqrs.EventMediator = faststream.Depends(mediator_factory),
):
    await mediator.send(body)  # same handlers as in your service
    await msg.ack()
Enter fullscreen mode Exit fullscreen mode

HTTP path: emit to Kafka (producer in bootstrap)

Idea: with message_broker=... on bootstrap, notification or domain events from handlers can be published through the same pipeline (see Event producing). Define commands_mapper and domain_events_mapper in your app.

import cqrs
import di
from cqrs.requests import bootstrap
from cqrs.message_brokers import kafka
from cqrs.adapters.kafka import KafkaProducerAdapter

kafka_broker = kafka.KafkaMessageBroker(
    producer=KafkaProducerAdapter(bootstrap_servers=["localhost:9092"]),
)
mediator = bootstrap.bootstrap(
    di_container=di.Container(),
    commands_mapper=commands_mapper,
    domain_events_mapper=events_mapper,
    message_broker=kafka_broker,  # publish path for events produced by handlers
)
Enter fullscreen mode Exit fullscreen mode

Links and calls to action

If you run Python microservices and live in the outbox / saga / CQRS space, what still hurts? I am curious—we might chip away at it together.

Top comments (0)