Building distributed systems in Python? Here is how python-cqrs tackles consistency with orchestrated sagas, the mediator pattern, and a transactional outbox—without preaching theory for ten pages first.
TL;DR
- Commands and queries stay in plain handlers: nothing in the handler depends on HTTP, Kafka, or CLI.
- Sagas: persisted state, automatic compensation, recovery after crashes; see the docs for fallback and circuit-style options.
- Transactional outbox: integration events in the same DB transaction as writes (at-least-once to the broker). In-process domain events are a different contract (see the table at the end of the cheat sheet).
Links: GitHub · Docs · PyPI python-cqrs
A story in three crashes
- A user places an order. Payment succeeds.
- A downstream inventory service is supposed to reserve stock—but the process dies (OOM, redeploy, node loss).
- The warehouse never heard about the order, yet money moved.
That is the boring side of distributed systems: not “microservices are trendy,” but consistency when a commit in one place does not magically align the rest of the world.
Hi, I am Vadim, tech ex-lead of the financial infrastructure team at Timeweb Cloud. This post is a short intro to the library we use under our Python microservices—no epic “how we got here”. If you are tired of reinventing transactional outbox, sagas, and mediator wiring, this may save you time.
python-cqrs is a framework for CQRS and event-driven architecture. It grew out of a fork of diator and has since become its own stack for reliable service design.
See it in code (before the feature list)
Idea: one command → one handler → mediator.send. Everything else (saga, outbox, Kafka) reuses the same wiring via bootstrap.
import di
import cqrs
from cqrs.requests import bootstrap
# 1) Contract: what goes in / what comes out (CQRS: this is a “write” path).
class CreateUserCommand(cqrs.Request):
email: str
name: str
class CreateUserResponse(cqrs.Response):
user_id: str
# 2) Pure handler: no FastAPI, no Kafka—only business logic.
class CreateUserHandler(cqrs.RequestHandler[CreateUserCommand, CreateUserResponse]):
async def handle(self, request: CreateUserCommand) -> CreateUserResponse:
user_id = f"user_{request.email}"
return CreateUserResponse(user_id=user_id)
# 3) Mediator: map command type → handler, resolve via DI.
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=lambda m: m.bind(CreateUserCommand, CreateUserHandler),
)
result = await mediator.send(
CreateUserCommand(email="user@example.com", name="John"),
)
More wiring options: Bootstrap. Below: one minimal snippet per feature; comments carry the point.
Feature cheat sheet (code)
Query (read model)
Idea: same RequestMediator and send—only the handler returns a read DTO instead of mutating state.
import cqrs
import di
from cqrs.requests import bootstrap
class GetUserByEmail(cqrs.Request):
email: str
class UserDto(cqrs.Response):
user_id: str
name: str
class GetUserHandler(cqrs.RequestHandler[GetUserByEmail, UserDto]):
async def handle(self, request: GetUserByEmail) -> UserDto:
return UserDto(user_id="u1", name="Ada") # e.g. DB / cache read
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=lambda m: m.bind(GetUserByEmail, GetUserHandler),
)
user = await mediator.send(GetUserByEmail(email="a@b.c"))
Domain events (in-process)
Idea: the command emits DomainEvents; the mediator dispatches them to EventHandlers in the same turn—fast and in-memory, not a public integration contract.
import cqrs
import di
from cqrs.requests import bootstrap
class CreateOrder(cqrs.Request):
order_id: str
class OrderLineAdded(cqrs.DomainEvent):
order_id: str
sku: str
class OrderCreated(cqrs.Response):
pass
class CreateOrderHandler(cqrs.RequestHandler[CreateOrder, OrderCreated]):
def __init__(self) -> None:
self._events: list[cqrs.Event] = []
@property
def events(self) -> list[cqrs.Event]:
return self._events
async def handle(self, request: CreateOrder) -> OrderCreated:
self._events.append(
OrderLineAdded(order_id=request.order_id, sku="sku-1"),
)
return OrderCreated()
class OnOrderLineAdded(cqrs.EventHandler[OrderLineAdded]):
async def handle(self, event: OrderLineAdded) -> None:
return # e.g. update projection; if process dies, effect may be lost (at-most-once)
def events_mapper(m: cqrs.EventMap) -> None:
m.bind(OrderLineAdded, OnOrderLineAdded)
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=lambda m: m.bind(CreateOrder, CreateOrderHandler),
domain_events_mapper=events_mapper,
max_concurrent_event_handlers=4, # cap parallel domain handlers
)
OutboxedEventMap (integration / durable)
Idea: register the wire name and payload type for NotificationEvent once; the outbox table stores the same event in the DB transaction as your aggregate—no “committed in DB, never reached Kafka” gap. Details: Transactional Outbox.
import cqrs
from pydantic import BaseModel
class UserJoinedPayload(BaseModel, frozen=True):
user_id: str
meeting_id: str
cqrs.OutboxedEventMap.register(
"user_joined", # topic / routing key name used when publishing
cqrs.NotificationEvent[UserJoinedPayload],
)
# In a command handler (same transaction as INSERT): append NotificationEvent to outbox repo.
Outbox → broker (publisher)
Idea: a separate loop reads outbox rows, publishes to Kafka, then commits—at-least-once delivery to the broker (consumers should be idempotent).
import asyncio
import cqrs
from cqrs.message_brokers import kafka
from cqrs.adapters import kafka as kafka_adapters
broker = kafka.KafkaMessageBroker(
producer=kafka_adapters.kafka_producer_factory(dsn="localhost:9092"),
)
producer = cqrs.EventProducer(
message_broker=broker,
repository=outbox_repository, # must implement the outbox repository protocol
)
async def publish_loop() -> None:
async for events in producer.event_batch_generator():
for event in events:
await producer.send_message(event) # push to the broker
await producer.repository.commit() # only after successful publish
await asyncio.sleep(10) # throttle polling (tune in production)
Orchestrated saga (compensation + async for steps)
Idea: act along ordered steps; on failure, run compensate in reverse. Saga storage and a log let you resume after a crash. Full API: Saga.
import dataclasses
import uuid
import di
import cqrs
from cqrs.saga import bootstrap
from cqrs.saga.saga import Saga
from cqrs.saga.step import SagaStepHandler, SagaStepResult
from cqrs.saga.storage.memory import MemorySagaStorage
from cqrs.saga.models import SagaContext
from cqrs.response import Response
@dataclasses.dataclass
class OrderContext(SagaContext):
order_id: str
items: list[str]
total_amount: float
reservation_id: str | None = None # shared mutable context between steps
class ReserveInventoryStep(SagaStepHandler[OrderContext, Response]):
def __init__(self, inventory) -> None: # inject a port (interface) via DI
self._inventory = inventory
async def act(self, context: OrderContext) -> SagaStepResult:
context.reservation_id = await self._inventory.reserve(
context.order_id, context.items,
)
return self._generate_step_result(Response())
async def compensate(self, context: OrderContext) -> None:
if context.reservation_id:
await self._inventory.release(context.reservation_id) # undo act()
class OrderSaga(Saga[OrderContext]):
steps = [ReserveInventoryStep]
def saga_mapper(m: cqrs.SagaMap) -> None:
m.bind(OrderContext, OrderSaga)
mediator = bootstrap.bootstrap(
di_container=di.Container(), # register `inventory` for ReserveInventoryStep
sagas_mapper=saga_mapper,
saga_storage=MemorySagaStorage(), # or SQLAlchemy storage in prod
)
async def run_saga() -> None:
ctx = OrderContext(order_id="o1", items=["a", "b"], total_amount=10.0)
# mediator.stream is not await — it returns an async iterator of step results
async for step in mediator.stream(context=ctx, saga_id=uuid.uuid4()):
pass # forward to logs, metrics, or SSE
StreamingRequestMediator (progress / SSE)
Idea: yield partial Response chunks; the consumer uses mediator.stream(...) to drive progress (e.g. over SSE). From Streaming mediator.
import typing
import asyncio
import di
import cqrs
from cqrs.requests import bootstrap
from cqrs.requests.request_handler import StreamingRequestHandler
from cqrs.message_brokers import devnull
class ProcessFilesCommand(cqrs.Request):
file_ids: list[str]
class FileResult(cqrs.Response):
file_id: str
status: str
class FileDone(cqrs.DomainEvent):
file_id: str
class OnFileDone(cqrs.EventHandler[FileDone]):
async def handle(self, event: FileDone) -> None:
return
def domain_events(m: cqrs.EventMap) -> None:
m.bind(FileDone, OnFileDone)
class ProcessFilesHandler(StreamingRequestHandler[ProcessFilesCommand, FileResult]):
def __init__(self) -> None:
self._events: list[cqrs.Event] = []
@property
def events(self) -> list[cqrs.Event]:
return self._events
async def handle(
self, request: ProcessFilesCommand,
) -> typing.AsyncIterator[FileResult]:
for file_id in request.file_ids:
await asyncio.sleep(0.01) # simulate work
self._events.append(FileDone(file_id=file_id))
yield FileResult(file_id=file_id, status="ok") # one chunk per file
mediator = bootstrap.bootstrap_streaming(
di_container=di.Container(),
commands_mapper=lambda m: m.bind(ProcessFilesCommand, ProcessFilesHandler),
domain_events_mapper=domain_events,
message_broker=devnull.DevnullMessageBroker(), # swap for real broker if you publish from here
max_concurrent_event_handlers=5,
)
async def run_stream() -> None:
async for chunk in mediator.stream(
ProcessFilesCommand(file_ids=["1", "2", "3"]),
):
if chunk:
print(chunk.file_id, chunk.status)
Chain of responsibility
Idea: the first handler that matches returns a result; others call await self.next(request). Register the ordered chain in RequestMap (see Chain of Responsibility).
import cqrs
from cqrs.requests.cor_request_handler import CORRequestHandler
class PayCmd(cqrs.Request):
amount: float
method: str
user_id: str
class PayResult(cqrs.Response):
ok: bool
msg: str = ""
class CardHandler(CORRequestHandler[PayCmd, PayResult]):
@property
def events(self) -> list[cqrs.Event]:
return []
async def handle(self, request: PayCmd) -> PayResult | None:
if request.method == "card":
return PayResult(ok=True, msg="card")
return await self.next(request) # delegate down the chain
class DefaultPay(CORRequestHandler[PayCmd, PayResult]):
@property
def events(self) -> list[cqrs.Event]:
return []
async def handle(self, request: PayCmd) -> PayResult | None:
return PayResult(ok=False, msg=f"unsupported: {request.method}")
EventMediator + FastStream (consume integration events)
Idea: Kafka delivers a NotificationEvent; you forward it to EventMediator.send—same handler types as in-process mapping, different transport. FastStream.
import functools
import di
import cqrs
import faststream
import pydantic
from cqrs.events import bootstrap
from faststream import kafka
class UserCreatedPayload(pydantic.BaseModel):
user_id: str
class OnUserCreated(cqrs.EventHandler[cqrs.NotificationEvent[UserCreatedPayload]]):
async def handle(
self, e: cqrs.NotificationEvent[UserCreatedPayload],
) -> None:
_ = e.payload.user_id # your integration logic
@functools.lru_cache(maxsize=1) # one mediator per process
def mediator_factory() -> cqrs.EventMediator:
def em(m: cqrs.EventMap) -> None:
m.bind(
cqrs.NotificationEvent[UserCreatedPayload],
OnUserCreated,
)
return bootstrap.bootstrap(di_container=di.Container(), events_mapper=em)
broker = kafka.KafkaBroker(bootstrap_servers=["localhost:9092"])
app = faststream.FastStream(broker)
@broker.subscriber("user_events", group_id="svc", auto_commit=False)
async def on_message(
body: cqrs.NotificationEvent[UserCreatedPayload],
msg: kafka.KafkaMessage,
mediator: cqrs.EventMediator = faststream.Depends(mediator_factory),
):
await mediator.send(body) # same handlers as in your service
await msg.ack()
HTTP path: emit to Kafka (producer in bootstrap)
Idea: with message_broker=... on bootstrap, notification or domain events from handlers can be published through the same pipeline (see Event producing). Define commands_mapper and domain_events_mapper in your app.
import cqrs
import di
from cqrs.requests import bootstrap
from cqrs.message_brokers import kafka
from cqrs.adapters.kafka import KafkaProducerAdapter
kafka_broker = kafka.KafkaMessageBroker(
producer=KafkaProducerAdapter(bootstrap_servers=["localhost:9092"]),
)
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=commands_mapper,
domain_events_mapper=events_mapper,
message_broker=kafka_broker, # publish path for events produced by handlers
)
Links and calls to action
- Star the repo, open issues, or comment if this matches what you are building: python-cqrs
- Migrations and v5: Discussion #57
- Deep dives: python-cqrs documentation
If you run Python microservices and live in the outbox / saga / CQRS space, what still hurts? I am curious—we might chip away at it together.

Top comments (0)