DEV Community

Cover image for Implementing CQRS in Python
Murad Akhundov
Murad Akhundov

Posted on

Implementing CQRS in Python

Introduction

Today we will explore the key concepts behind the CQRS pattern and its benefits. Then we will dive into its implementation in Python, using the Diator library as our framework of choice.

What is CQRS?

CQRS (Command Query Responsibility Segregation)  -  is a pattern that separates models for updating information (writes) and reading information (reads). This concept has been widely explained in articles such as Udi Dahan's article named "Clarified CQRS" and Martin Fowler - "CQRS", or in the outstanding Greg Young talk. We will only briefly discuss the key concepts and the way to apply them through the Diator library.

Source: https://martinfowler.com/bliki/CQRS.html

Why Diator?

Diator  - is a completely new library for implementing the CQRS pattern in Python. It provides a set of abstractions and utilities to help you separate your read-and-write concerns. The Diator provides a flexible API for Command/Query and their Handlers implementation and support for several types of events (Domain Event, Notification Event, and ECSTEvent). Furthermore, the library has built-in integration with the different message brokers (Redis Pub/Sub, Azure Service Bus).

Implementing CQRS with Diator

To use the Diator library, there are several components to familiarize yourself with. These include Commands and Command Handlers, Queries and Query Handlers, Events and Event Handlers, and Message Brokers. By utilizing these components, you can effectively manage the exchange of information between the read and write models in your application.

Installation

The Diator library can be installed using pip. There are several options for installation:

Common Installation



pip install diator


Enter fullscreen mode Exit fullscreen mode

Azure Installation:



pip install diator[azure]


Enter fullscreen mode Exit fullscreen mode

Will install azure-servicebus library to support the Azure Service Bus message broker.

Redis Installation:



pip install diator[redis]


Enter fullscreen mode Exit fullscreen mode

Will install redis library to support the Redis Pub/Sub.

Commands and Command Handlers

Command represents an intention to perform an action or change the state of an application. Here is an example of the Command:



from diator.requests import Request
from diator.response import Response


@dataclasses.dataclass(frozen=True, kw_only=True)
class JoinMeetingCommand(Request)
    meeting_id: int = dataclasses.field(default=1)
    user_id: int = dataclasses.field(default=1)


Enter fullscreen mode Exit fullscreen mode

Command Handler is a component responsible for handling a Command and executing the corresponding action:



from diator.requests import RequestHandler
from diator.events import EventHandler


class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
    def __init__(self, meeting_api: MeetingAPI) -> None:
        self._meeting_api = meeting_api
        self._events: list[Event] = []

    @property
    def events(self) -> list[Event]:
        return self._events

    async def handle(self, request: JoinMeetingCommand) -> None:
        await self._meeting_api.join(request.meeting_id, request.user_id)
        self._events.append(
            UserJoinedDomainEvent(user_id=request.user_id, timestamp=datetime.utcnow(), meeting_id=request.meeting_id)
        )
        self._events.append(
            UserJoinedNotificationEvent(user_id=request.user_id)
        )


Enter fullscreen mode Exit fullscreen mode

Queries and Query Handlers

Query represents a request for information or data from the application's read model. The process of handling queries SHOULD NOT modify the state of the application:



from diator.requests import Request


@dataclasses.dataclass(frozen=True, kw_only=True)
class ReadMeetingQuery(Request)
    meeting_id: int = dataclasses.field(default=1)


Enter fullscreen mode Exit fullscreen mode

Query Result is an object that contains the data requested by a Query. It is returned by a Query Handler after it processes a Query against the read model:



from diator.response import Response


@dataclasses.dataclass(frozen=True, kw_only=True)
class ReadMeetingQueryResult(Response)
    meeting_id: int = dataclasses.field(default=1)
    link: str = dataclasses.field()


Enter fullscreen mode Exit fullscreen mode

Query Handler is a component responsible for processing a Query against the read model and returning the requested data as a Query Result:



from diator.requests import RequestHandler


class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]):
    def __init__(self, meeting_api: MeetingAPI) -> None:
        self._meeting_api = meeting_api
        self._events: list[Event] = []

    @property
    def events(self) -> list[Event]:
        return self._events

    async def handle(self, request: ReadMeetingQuery) -> ReadMeetingQueryResult:
        link = await self._meeting_api.get_link(request.meeting_id)
        return ReadMeetingQueryResult(
            meeting_id=request.meeting_id,
            link=link
        )


Enter fullscreen mode Exit fullscreen mode

Events and Event Handlers

Event represents a fact that has occurred in the application. It typically represents a significant change in the application's state that is of interest to other parts of the application or external systems. There are several event types:

  • Domain Event - a message describing a significant event that has occurred in the business domain.
  • Notification Event - this is a message regarding a change in the business domain that other components will react to.
  • Event-carried state transfer (ECST) - messages notify subscribers about changes in the producer's internal state.


from diator.events import DomainEvent, NotificationEvent, ECSTEvent


@dataclasses.dataclass(frozen=True, kw_only=True)
class UserJoinedDomainEvent(DomainEvent):  # will be handled by an event handler
    user_id: int = dataclasses.field()
    meeting_id: int = dataclasses.field()
    timestamp: datetime = dataclasses.field()


@dataclasses.dataclass(frozen=True, kw_only=True)
class UserJoinedNotificationEvent(NotificationEvent):  # will be sent to a message broker
    user_id: int = dataclasses.field()


@dataclasses.dataclass(frozen=True, kw_only=True)
class UserChangedECSTEvent(ECSTEvent):  # will be sent to a message broker
    user_id: int = dataclasses.field()
    new_username: str = dataclasses.field()


Enter fullscreen mode Exit fullscreen mode

Event Handler is a component responsible for processing an Event that has occurred in the application:



from diator.events import EventHandler


class UserJoinedDomainEventHandler(EventHandler[UserJoinedDomainEvent]):
    def __init__(self, meeting_api: MeetingAPI) -> None:
        self._meeting_api = meeting_api

    async def handle(self, event: UserJoinedDomainEvent) -> None:
        await self._meeting_api.notify(event.meeting_id, "New user joined!")


Enter fullscreen mode Exit fullscreen mode

Setting up the Mediator class

Let's go through the steps of the mediator configuration:

  • Configure dependency injection:


from di import Container, bind_by_type  # using di lib as di-framework
from diator.container.di import DIContainer


def setup_di() -> DIContainer:
    external_container = Container()

    external_container.bind(
        bind_by_type(
            Dependent(UserJoinedDomainEventHandler, scope="request"), 
            UserJoinedDomainEventHandler
        )
    )
    external_container.bind(
        bind_by_type(
            Dependent(JoinMeetingCommandHandler, scope="request"),
            JoinMeetingCommandHandler,
        )
    )
    external_container.bind(
        bind_by_type(
            Dependent(ReadMeetingQueryHandler, scope="request"),
            ReadMeetingQueryHandler,
        )
    )

    container = DIContainer()
    container.attach_external_container(external_container)

    return container


Enter fullscreen mode Exit fullscreen mode
  • Map events to event handlers:


from diator.mediator import Mediator
from diator.events import EventMap

def setup_mediator() -> Mediator:
    container = setup_di()

    event_map = EventMap()
    event_map.bind(UserJoinedDomainEvent, UserJoinedDomainEventHandler)


Enter fullscreen mode Exit fullscreen mode
  • Map requests (commands and queries) to request handlers:


from diator.mediator import Mediator
from diator.events import EventMap
from diator.requests import RequestMap

def setup_mediator() -> Mediator:
    container = setup_di()

    event_map = EventMap()
    event_map.bind(UserJoinedDomainEvent, UserJoinedDomainEventHandler)
    request_map = RequestMap()
    request_map.bind(JoinMeetingCommand, JoinMeetingCommandHandler)
    request_map.bind(ReadMeetingQuery, ReadMeetingQueryHandler)


Enter fullscreen mode Exit fullscreen mode
  • Configure EventEmitter


from redis import asyncio as redis
from diator.mediator import Mediator
from diator.events import EventMap, EventEmitter
from diator.message_brokers.redis import RedisMessageBroker
from diator.requests import RequestMap


def setup_mediator() -> Mediator:
    container = setup_di()

    event_map = EventMap()
    event_map.bind(UserJoinedDomainEvent, UserJoinedDomainEventHandler)
    request_map = RequestMap()
    request_map.bind(JoinMeetingCommand, JoinMeetingCommandHandler)
    request_map.bind(ReadMeetingQuery, ReadMeetingQueryHandler)

    redis_client = redis.Redis.from_url("redis://localhost:6379/0")
    event_emitter = EventEmitter(
        message_broker=RedisMessageBroker(redis_client),
        event_map=event_map,
        container=container,
    )


Enter fullscreen mode Exit fullscreen mode
  • Add logging middleware


from redis import asyncio as redis
from diator.mediator import Mediator
from diator.events import EventMap, EventEmitter
from diator.message_brokers.redis import RedisMessageBroker
from diator.requests import RequestMap
from diator.middlewares import MiddlewareChain
from diator.middlewares.logging import LoggingMiddleware


def setup_mediator() -> Mediator:
    container = setup_di()

    event_map = EventMap()
    event_map.bind(UserJoinedDomainEvent, UserJoinedDomainEventHandler)
    request_map = RequestMap()
    request_map.bind(JoinMeetingCommand, JoinMeetingCommandHandler)
    request_map.bind(ReadMeetingQuery, ReadMeetingQueryHandler)

    redis_client = redis.Redis.from_url("redis://localhost:6379/0")
    event_emitter = EventEmitter(
        message_broker=RedisMessageBroker(redis_client),
        event_map=event_map,
        container=container,
    )

    middleware_chain = MiddlewareChain()
    middleware_chain.add(LoggingMiddleware())


Enter fullscreen mode Exit fullscreen mode
  • Finally, setup the mediator class:


from redis import asyncio as redis
from diator.mediator import Mediator
from diator.events import EventMap, EventEmitter
from diator.message_brokers.redis import RedisMessageBroker
from diator.requests import RequestMap
from diator.middlewares import MiddlewareChain
from diator.middlewares.logging import LoggingMiddleware


def build_mediator() -> Mediator:
    container = setup_di()

    event_map = EventMap()
    event_map.bind(UserJoinedDomainEvent, UserJoinedDomainEventHandler)
    request_map = RequestMap()
    request_map.bind(JoinMeetingCommand, JoinMeetingCommandHandler)
    request_map.bind(ReadMeetingQuery, ReadMeetingQueryHandler)

    redis_client = redis.Redis.from_url("redis://localhost:6379/0")
    event_emitter = EventEmitter(
        message_broker=RedisMessageBroker(redis_client),
        event_map=event_map,
        container=container,
    )

    middleware_chain = MiddlewareChain()
    middleware_chain.add(LoggingMiddleware())

    return Mediator(
        request_map=request_map, 
        event_emitter=event_emitter, 
        container=container, 
        middleware_chain=MiddlewareChain
    )


Enter fullscreen mode Exit fullscreen mode

Now we have ready to use mediator instance!

Usage of the Mediator

Since we have a ready mediator instance builder (setup_mediator), we are able to send our queries and commands to it.

Here are several use cases:

  • Sending a query:


import asyncio


async def main() -> None:
    mediator = build_mediator()

    response = await mediator.send(ReadMeetingQuery(meeting_id=57))

    assert isinstance(response, ReadMeetingQueryResult)

if __name__ == "__main__":
    asyncio.run(main())


Enter fullscreen mode Exit fullscreen mode
  • Sending a command:


import asyncio


async def main() -> None:
    mediator = build_mediator()

    response = await mediator.send(JoinMeetingCommand(meeting_id=57, user_id=1))

    assert not response

if __name__ == "__main__":
    asyncio.run(main())


Enter fullscreen mode Exit fullscreen mode

After sending a command, it will be caught by a specified command handler (JoinMeetingCommandHandler in this case). Since JoingMeetingCommandHandler also produces UserJoinedDomainEvent, this domain event will be processed by UserJoinedDomainEventHandler. Also, our command handler produces UserJoinedNotificationEvent, so it will be sent to Redis Pub/Sub channel. Example of a message in Redis:



{
   "message_type":"notification_event",
   "message_name":"UserJoinedNotificationEvent",
   "message_id":"9f62e977-73f7-462b-92cb-8ea658d3bcb5",
   "payload":{
      "event_id":"9f62e977-73f7-462b-92cb-8ea658d3bcb5",
      "event_timestamp":"2023-03-07T09:26:02.588855",
      "user_id":1
   }
}


Enter fullscreen mode Exit fullscreen mode

Generally, the flow looks like below:

Flow

Conclusion

it's important to note that CQRS is not a silver bullet and may not be the right choice for your project. It's crucial to carefully consider the tradeoffs and design considerations.

Hope that this article has provided you with a useful guide to understanding and implementing CQRS with Diator in Python!

Top comments (0)