Introduction
Today we will explore the key concepts behind the CQRS pattern and its benefits. Then we will dive into its implementation in Python, using the Diator library as our framework of choice.
What is CQRS?
CQRS (Command Query Responsibility Segregation) - is a pattern that separates models for updating information (writes) and reading information (reads). This concept has been widely explained in articles such as Udi Dahan's article named "Clarified CQRS" and Martin Fowler - "CQRS", or in the outstanding Greg Young talk. We will only briefly discuss the key concepts and the way to apply them through the Diator library.
Why Diator?
Diator - is a completely new library for implementing the CQRS pattern in Python. It provides a set of abstractions and utilities to help you separate your read-and-write concerns. The Diator provides a flexible API for Command/Query and their Handlers implementation and support for several types of events (Domain Event, Notification Event, and ECSTEvent). Furthermore, the library has built-in integration with the different message brokers (Redis Pub/Sub, Azure Service Bus).
Implementing CQRS with Diator
To use the Diator library, there are several components to familiarize yourself with. These include Commands and Command Handlers, Queries and Query Handlers, Events and Event Handlers, and Message Brokers. By utilizing these components, you can effectively manage the exchange of information between the read and write models in your application.
Installation
The Diator library can be installed using pip. There are several options for installation:
Common Installation
pip install diator
Azure Installation:
pip install diator[azure]
Will install azure-servicebus library to support the Azure Service Bus message broker.
Redis Installation:
pip install diator[redis]
Will install redis library to support the Redis Pub/Sub.
Commands and Command Handlers
Command represents an intention to perform an action or change the state of an application. Here is an example of the Command:
from diator.requests import Request
from diator.response import Response
@dataclasses.dataclass(frozen=True, kw_only=True)
class JoinMeetingCommand(Request)
meeting_id: int = dataclasses.field(default=1)
user_id: int = dataclasses.field(default=1)
Command Handler is a component responsible for handling a Command and executing the corresponding action:
from diator.requests import RequestHandler
from diator.events import EventHandler
class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
def __init__(self, meeting_api: MeetingAPI) -> None:
self._meeting_api = meeting_api
self._events: list[Event] = []
@property
def events(self) -> list[Event]:
return self._events
async def handle(self, request: JoinMeetingCommand) -> None:
await self._meeting_api.join(request.meeting_id, request.user_id)
self._events.append(
UserJoinedDomainEvent(user_id=request.user_id, timestamp=datetime.utcnow(), meeting_id=request.meeting_id)
)
self._events.append(
UserJoinedNotificationEvent(user_id=request.user_id)
)
Queries and Query Handlers
Query represents a request for information or data from the application's read model. The process of handling queries SHOULD NOT modify the state of the application:
from diator.requests import Request
@dataclasses.dataclass(frozen=True, kw_only=True)
class ReadMeetingQuery(Request)
meeting_id: int = dataclasses.field(default=1)
Query Result is an object that contains the data requested by a Query. It is returned by a Query Handler after it processes a Query against the read model:
from diator.response import Response
@dataclasses.dataclass(frozen=True, kw_only=True)
class ReadMeetingQueryResult(Response)
meeting_id: int = dataclasses.field(default=1)
link: str = dataclasses.field()
Query Handler is a component responsible for processing a Query against the read model and returning the requested data as a Query Result:
from diator.requests import RequestHandler
class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]):
def __init__(self, meeting_api: MeetingAPI) -> None:
self._meeting_api = meeting_api
self._events: list[Event] = []
@property
def events(self) -> list[Event]:
return self._events
async def handle(self, request: ReadMeetingQuery) -> ReadMeetingQueryResult:
link = await self._meeting_api.get_link(request.meeting_id)
return ReadMeetingQueryResult(
meeting_id=request.meeting_id,
link=link
)
Events and Event Handlers
Event represents a fact that has occurred in the application. It typically represents a significant change in the application's state that is of interest to other parts of the application or external systems. There are several event types:
- Domain Event - a message describing a significant event that has occurred in the business domain.
- Notification Event - this is a message regarding a change in the business domain that other components will react to.
- Event-carried state transfer (ECST) - messages notify subscribers about changes in the producer's internal state.
from diator.events import DomainEvent, NotificationEvent, ECSTEvent
@dataclasses.dataclass(frozen=True, kw_only=True)
class UserJoinedDomainEvent(DomainEvent): # will be handled by an event handler
user_id: int = dataclasses.field()
meeting_id: int = dataclasses.field()
timestamp: datetime = dataclasses.field()
@dataclasses.dataclass(frozen=True, kw_only=True)
class UserJoinedNotificationEvent(NotificationEvent): # will be sent to a message broker
user_id: int = dataclasses.field()
@dataclasses.dataclass(frozen=True, kw_only=True)
class UserChangedECSTEvent(ECSTEvent): # will be sent to a message broker
user_id: int = dataclasses.field()
new_username: str = dataclasses.field()
Event Handler is a component responsible for processing an Event that has occurred in the application:
from diator.events import EventHandler
class UserJoinedDomainEventHandler(EventHandler[UserJoinedDomainEvent]):
def __init__(self, meeting_api: MeetingAPI) -> None:
self._meeting_api = meeting_api
async def handle(self, event: UserJoinedDomainEvent) -> None:
await self._meeting_api.notify(event.meeting_id, "New user joined!")
Setting up the Mediator class
Let's go through the steps of the mediator configuration:
- Configure dependency injection:
from di import Container, bind_by_type # using di lib as di-framework
from diator.container.di import DIContainer
def setup_di() -> DIContainer:
external_container = Container()
external_container.bind(
bind_by_type(
Dependent(UserJoinedDomainEventHandler, scope="request"),
UserJoinedDomainEventHandler
)
)
external_container.bind(
bind_by_type(
Dependent(JoinMeetingCommandHandler, scope="request"),
JoinMeetingCommandHandler,
)
)
external_container.bind(
bind_by_type(
Dependent(ReadMeetingQueryHandler, scope="request"),
ReadMeetingQueryHandler,
)
)
container = DIContainer()
container.attach_external_container(external_container)
return container
- Map events to event handlers:
from diator.mediator import Mediator
from diator.events import EventMap
def setup_mediator() -> Mediator:
container = setup_di()
event_map = EventMap()
event_map.bind(UserJoinedDomainEvent, UserJoinedDomainEventHandler)
- Map requests (commands and queries) to request handlers:
from diator.mediator import Mediator
from diator.events import EventMap
from diator.requests import RequestMap
def setup_mediator() -> Mediator:
container = setup_di()
event_map = EventMap()
event_map.bind(UserJoinedDomainEvent, UserJoinedDomainEventHandler)
request_map = RequestMap()
request_map.bind(JoinMeetingCommand, JoinMeetingCommandHandler)
request_map.bind(ReadMeetingQuery, ReadMeetingQueryHandler)
- Configure
EventEmitter
from redis import asyncio as redis
from diator.mediator import Mediator
from diator.events import EventMap, EventEmitter
from diator.message_brokers.redis import RedisMessageBroker
from diator.requests import RequestMap
def setup_mediator() -> Mediator:
container = setup_di()
event_map = EventMap()
event_map.bind(UserJoinedDomainEvent, UserJoinedDomainEventHandler)
request_map = RequestMap()
request_map.bind(JoinMeetingCommand, JoinMeetingCommandHandler)
request_map.bind(ReadMeetingQuery, ReadMeetingQueryHandler)
redis_client = redis.Redis.from_url("redis://localhost:6379/0")
event_emitter = EventEmitter(
message_broker=RedisMessageBroker(redis_client),
event_map=event_map,
container=container,
)
- Add logging middleware
from redis import asyncio as redis
from diator.mediator import Mediator
from diator.events import EventMap, EventEmitter
from diator.message_brokers.redis import RedisMessageBroker
from diator.requests import RequestMap
from diator.middlewares import MiddlewareChain
from diator.middlewares.logging import LoggingMiddleware
def setup_mediator() -> Mediator:
container = setup_di()
event_map = EventMap()
event_map.bind(UserJoinedDomainEvent, UserJoinedDomainEventHandler)
request_map = RequestMap()
request_map.bind(JoinMeetingCommand, JoinMeetingCommandHandler)
request_map.bind(ReadMeetingQuery, ReadMeetingQueryHandler)
redis_client = redis.Redis.from_url("redis://localhost:6379/0")
event_emitter = EventEmitter(
message_broker=RedisMessageBroker(redis_client),
event_map=event_map,
container=container,
)
middleware_chain = MiddlewareChain()
middleware_chain.add(LoggingMiddleware())
- Finally, setup the mediator class:
from redis import asyncio as redis
from diator.mediator import Mediator
from diator.events import EventMap, EventEmitter
from diator.message_brokers.redis import RedisMessageBroker
from diator.requests import RequestMap
from diator.middlewares import MiddlewareChain
from diator.middlewares.logging import LoggingMiddleware
def build_mediator() -> Mediator:
container = setup_di()
event_map = EventMap()
event_map.bind(UserJoinedDomainEvent, UserJoinedDomainEventHandler)
request_map = RequestMap()
request_map.bind(JoinMeetingCommand, JoinMeetingCommandHandler)
request_map.bind(ReadMeetingQuery, ReadMeetingQueryHandler)
redis_client = redis.Redis.from_url("redis://localhost:6379/0")
event_emitter = EventEmitter(
message_broker=RedisMessageBroker(redis_client),
event_map=event_map,
container=container,
)
middleware_chain = MiddlewareChain()
middleware_chain.add(LoggingMiddleware())
return Mediator(
request_map=request_map,
event_emitter=event_emitter,
container=container,
middleware_chain=MiddlewareChain
)
Now we have ready to use mediator instance!
Usage of the Mediator
Since we have a ready mediator instance builder (setup_mediator), we are able to send our queries and commands to it.
Here are several use cases:
- Sending a query:
import asyncio
async def main() -> None:
mediator = build_mediator()
response = await mediator.send(ReadMeetingQuery(meeting_id=57))
assert isinstance(response, ReadMeetingQueryResult)
if __name__ == "__main__":
asyncio.run(main())
- Sending a command:
import asyncio
async def main() -> None:
mediator = build_mediator()
response = await mediator.send(JoinMeetingCommand(meeting_id=57, user_id=1))
assert not response
if __name__ == "__main__":
asyncio.run(main())
After sending a command, it will be caught by a specified command handler (JoinMeetingCommandHandler in this case). Since JoingMeetingCommandHandler also produces UserJoinedDomainEvent, this domain event will be processed by UserJoinedDomainEventHandler. Also, our command handler produces UserJoinedNotificationEvent, so it will be sent to Redis Pub/Sub channel. Example of a message in Redis:
{
"message_type":"notification_event",
"message_name":"UserJoinedNotificationEvent",
"message_id":"9f62e977-73f7-462b-92cb-8ea658d3bcb5",
"payload":{
"event_id":"9f62e977-73f7-462b-92cb-8ea658d3bcb5",
"event_timestamp":"2023-03-07T09:26:02.588855",
"user_id":1
}
}
Generally, the flow looks like below:
Conclusion
it's important to note that CQRS is not a silver bullet and may not be the right choice for your project. It's crucial to carefully consider the tradeoffs and design considerations.
Hope that this article has provided you with a useful guide to understanding and implementing CQRS with Diator in Python!
Top comments (0)