DEV Community

Deyan Petrov
Deyan Petrov

Posted on • Updated on

CQRS+

TLDR; Splitting your "services" into several "microservices" based on technical responsibility simplifies development and code management and gives you deployment flexibility and isolation.

Introduction

This post is intended to demonstrate one concrete way of implementing microservices using CQRS but going beyond the the standard Command/Query separation. It is a follow-up to a previous post about Current State + Last Event as an alternative to Event Sourcing, at the end of which I outlined this architectural pattern.
Note that even though some examples may refer to F#/.NET/Azure, the CQRS+ approach can be used with any tech stack and programming language.

Problem Statement

You have split your system into several "bounded contexts", each one consisting of a single or a few relatively encapsulated "services", e.g. Customer Registration, Order Processing, Notification Service, etc.), where every "service" contains a few (1-5) "domain entities" (e.g. Notification Service contains an entity called "Notification"). For each "service" you have created a .NET/F# solution, but now you are wondering how to organize internally the functionality inside each solution - do you need a single project (+1 test project) only?

On the one hand, you have numerous requirements for every "service":

  1. Getting some user input via the UI, wrapping it as "command" which results in the creation/update of some "entities" or simply data records, validating some business rules or maintaining some invariants along the way
  2. Displaying the existing data to the user in a UI, or returning it to an external system
  3. Handling events raised by other internal/external systems, based on which you potentially means you trigger internal commands (=> point 1.)
  4. Maybe you need to publish some events yourself, so other internal systems can react to them?
  5. Maybe you need to publish some events externally via webhooks, so other external systems can react to them?
  6. Write some audit trail entries for some critical entities, or calculate some balance asynchronously.

On the other hand, you have heard about "microservices" with just a "few" LOCs, and you have been wondering how the hell do the "wise guys" manage to produce such microservices, while yours always end up being fat services with thousands of LOCs (or you have a single monolith).

Ideally you would modularize/split your "service" in such a way that you end up with small microservices which can be written/re-written/compiled/tested/deployed/scaled/monitored/restarted individually.

Solution

The simple structure I follow is the following:

  • Your company's overall software
    • Bounded Context 1
      • Service 1
        • Microservice 1
        • Microservice 2

It could well be that you have only a single "service" in a bounded context.

The important point is that a service is split into microservices based on technical responsibilities - a microservice responsible for command processing, another one for query processing, a third for event handling, etc.

So a CQRS+ solution could look like this:

Image description

Diagram 1: Xyz Service and its microservices

You would create a solution for the "service" with multiple projects (one per technical microservice), and multiple test projects (again, one per technical microservice). Your solution structure will then look like this:

XyzService.sln
  src/
    XyzService.CommandHandling.fsproj
    XyzService.QueryHandling.fsproj
    XyzService.EventHandling.fsproj
    XyzService.EventPublishing.fsproj
    XyzService.ExternalEventPublishing.fsproj
    XyzService.ExternalEventHandling.fsproj
    XyzService.ChangeHandling.fsproj
    XyzService.Shared.fsproj
  tests/
    XyzService.CommandHandling.Tests.fsproj
    XyzService.QueryHandling.Tests.fsproj
    XyzService.EventHandling.Tests.fsproj
    XyzService.EventPublishing.Tests.fsproj
    XyzService.ExternalEventPublishing.Tests.fsproj
    XyzService.ExternalEventHandling.Tests.fsproj
    XyzService.ChangeHandling.Tests.fsproj
Enter fullscreen mode Exit fullscreen mode

Additionally, as you see from the above, a XyzService.Shared.fsproj project is needed as well, containing the common Persistence, Api Dtos and even Domain Types used by several projects - e.g. CommandHandling (write) and QueryHandling (read).

The deployed applications (e.g. pods in K8s) will look like this:

xyz-cmdh-aaaaaaaaaa-bbbbb
xyz-cmdh-aaaaaaaaaa-ccccc
xyz-qryh-bbbbbbbbbb-ddddd
xyz-qryh-bbbbbbbbbb-eeeee
xyz-qryh-bbbbbbbbbb-fffff
xyz-evh-cccccccccc-ggggg
xyz-evp-dddddddddd-hhhhh
xyz-extevp-eeeeeeeeee-kkkkk
xyz-extevh-ffffffffff-lllll
xyz-chgh-gggggggggg-mmmmm
Enter fullscreen mode Exit fullscreen mode

Note above that cmdh and qryh replica sets have multiple replicas, whereas the rest could have a single replica configured.

K8s is ideal for deploying such fine-grained applications, as it allows for really high-density of pods per node. .NET is not that ideal though, as usually the memory used per pod is 100Mb+, so the nodes need to have more memory (Rust/Go would be better in this regard).

Benefits

  1. Clarity how to break down bounded contexts/services into microservices. A "service" is more or less a bounded context or a big part of such, and a "microservice" is a part of the "service" focused on a particular technical responsibility
  2. Possibility to have small microservices of a couple of hundred LOCs => easy to grasp and "cache" in your brain
  3. The microservices are standalone processes/apps/pods, can be deployed, scaled, restarted, monitored individually
  4. The place of the domain logic is clear - only in the Command Handling microservice. All the other microservices contain no business logic

Drawbacks

  1. Too much unnecessary segregation => accidental complexity?
    • Pays off once you start looking after memory leaks, or quickly fix a bug by deploying only a small microservice without affecting the rest
    • Is also offset by the standardization of the microservices, and the smaller context each microservice represents => simplicity
  2. The (technical) microservices are sharing some data model or API contract, hence several of them may need to be deployed together
    • Data models and API contracts are generally difficult to change (due to so many other reasons), and backwards compatibility must be highly respected anyway.

Implementation Details

The structure and contents of each "microservice" as well as the Shared project will be described below.

XyzService.CommandHandling (CmdH)

CmdH microservice is responsible for processing all commands which arrive in the form of PUT, POST, PATCH and DELETE requests. Automatic Jobs (e.g. cron jobs/Timer Triggers) are also hosted in this application, as usually these invoke some business logic.

The standard command processing pipeline consists of the following steps:

  1. Deserialize request into Api.Requests.XxxxCommandDto and furthemore into XyzCommand record type, in the process of which the fields are validated (incl. enum value lookup, boundary checks, etc.)
  2. Fetch something from the database if needed (especially in case of executing an action on an existing entity)
  3. Invoke the domain logic for creating a new entity, or updating an existing one, considering all business rules (e.g. Blocked Customer can be Closed or put back to Active, but cannot be set to PendingKYC for example).
  4. After the business logic has been executed and a result has been produced then the domain entity is mapped to a Persistence Dto and the latter is stored in the database, and then again mapped to an Api DTO, and the latter is serialized to JSON and returned by the API.

CmdH is the beefiest project/app of all, as it contains the domain logic, e.g. validation, calculation, etc. More often than not the domain entities are defined as state machines, whereby every state transition is validated, some flags are set, etc.

CmdH is focused on writing stuff to a Write Data Model (optimized for writing), always respecting entity (aggregate root) consistency boundaries. NoSQL/Document databases are a good fit for a write model and remove the need for an ORM, or any additional work splitting a domain entity across several tables (not a responsibility of CmdH!).

XyzService.QueryHandling (QryH)

QryH microservice is responsible for handling GET queries for fetching REST resources.

The query processing pipeline usually consists of:

  1. Deserialize JSON into Api.Requests.XyzQueryDto
  2. Validate some of the properties (query parameters) by trying to parse them into domain types (this is why some domain types have to be in Shared.Domain.Types!)
  3. Execute database query
  4. Map Persistence.Dtos.XyzDto to Api.Responses.XyzDto and return to caller

QryH does not care about domain entities in general, it is happy to serve also flattened/"joined" data of several entities, or subset of the data of an entity. It relies either on the same Data Model used for writing (in case of fetching of a single entity by id for example, or any filtering based on simple indexes) or requires its own Read Model for optimized searching/filtering by a random combination of filters, where the Read Model is usually filled asynchronously => eventually consistent. Column store databases can be a pretty good fit for a dedicated high performance Read Model.

XyzService.EventPublishing (EvP)

EvP is responsible for publishing events to a message bus, so that internal subscribers (e.g. xyz2-evh, or a DWH) can receive them.

The standard event publishing pipeline looks like this (in case database Change Streams/Feed technology is used):

  1. Receive database change stream event, deserialize it to a Persistence.Dtos.XyzDto
  2. Map the persistence dto to a Api.Dtos.XyzDto (event schema is part of the public service contract!)
  3. Publish dto to a message bus topic

EvP exposes also API for subscribing and unsubscribing to specific event types. Additionally, EvP also allows (again via a dedicated API) for replay of events.

XyzService.EventHandling (EvH)

EvH microservice is handling events received from other internal microservices (published by some other xyz2-evp, see above), received usually from a message bus like Kafka, Azure Event Hubs, etc.

The standard event processing pipelines looks like this:

  1. Deserialize received event JSON into Api.Dtos.XyzDto
  2. Handle the parsed event by e.g. creating a command and sending it to CmdH

EvH is taking care of cases when a certain event cannot be processed, due to some internal event handling issues, or inability to connect to CmdH for example. In that case EvH has the choice to either stop/restart the processing of all events for the specific consumer group of the topic, or "park" the problematic event on a message queue for further retrying later on, which comes with the downside of losing strict ordering of the consumer group-based event processing.

XyzService.ExternalEventPublishing (ExtEvp)

ExtEvp is responsible for pushing events via webhooks/HTTPs to external subscribers (including retry in case the subscriber is down).

The standard external event publishing pipeline looks like this (in case database Change Streams/Feed technology is used):

  1. Receive database change stream event, deserialize it to Persistence.Dtos.XyzDto
  2. Map the persistence dto to Api.Dtos.XyzDto (event schema is part of the public service contract!)
  3. Send it to a collection of (subscribed in advance) HTTP endpoints

ExtEvp must take care of temporary unavailability of some (out of all) subscribers. Parking messages in "retry message queues" with invisibility period is usually a good approach of handling this problem.

XyzService.ExternalEventHandling (ExtEvh)

ExtEvh is responsible for handling events received from external systems, usually via webhooks => public listener endpoints.

The standard external event handling pipeline looks like this:

  1. Receive e.g. HTTPs request, map it to Api.Dtos.XyzDto (= initial schema validation)
  2. Convert the request to an internal command to CmdH, and invoke CmdH.
  3. Respond with "200 OK" back to the webhook call

ExtEvh's endpoints must be secured, so that they can be invoked only by trusted external callers.

XyzService.ChangeHandling (ChgH)

ChgH is responsible for handling database Change Streams/Feed changes and calculating/creating something. It can be regarded as a more general version of EvP/ExtEvP, and originally it was used only for auditing or creating/calculating (e.g. diff between 2 versions of an entity) audit trail entries and writing them to the database, but some other calculation logic can be put here as well (e.g. maintaining balance entities or similar).

The standard change handling pipeline looks like this (in case database Change Streams/Feed technology is used):

  1. Receive database change stream event, deserialize it to Persistence.Dtos.XyzDto
  2. Do some processing, e.g. calculate a diff between 2 entity states and store an audit trail entry, or trigger a command to CmdH, etc.

ChgH, together with Evp and ExtEvp is used to offload CmdH from any additional duties. CmdH focuses on performing business logic and storing data into the write model. All the other services act in a near-real-time on the data change and perform their additional duties.

It is possible though that some of the other microservices also directly subscribe to database change streams - such is the case for example when an in-memory (= in-process) cache has to be built in QryH for example.

Additional Solution Files

The solution contains the following central files:

  1. Open API (Swagger) yaml file - combines all PUT/POST/PATCH/DELETE/GET operations from all microservices in a single API

Additional Project Files

Every fsproj contains the following deployment-related files:

  1. Dockerfile
  2. Kubernetes deployment yaml file
  3. Azure DevOps deployment yaml file + bash script
  4. Bicep templates + bash script (in case of using Azure, similar for AWS/GCP)
  5. DB DDL/DML Scripts for setting up and upgrading the database

Solution containing reusable code

What if you have 10 bounded contexts with 1 service per each, and then you have to split these 10 services into microservices? What to do with some repetitive code for e.g. writing or reading messages from message bus and writing to a message queue in case something goes wrong?

In case all micrservices are using the same technology, then some generic/reusable functionality used in all microservices of the same type (e.g. EventHandling) can be extracted into a separate solution/project (e.g. Framework.Services.EventHandling).

You may easily end up with 2 solutions containing reusable "Framework"/helper code:

  • Framework - containing projects/code reusable across all types of microservices, e.g. string manipulation, date helpers etc
  • Framework.Services - containing projects/code reusable across certain class of microservices, e.g. *.EventHandling

Conclusion

Splitting services into microservices based on CQRS+ clearly explains how to break down your code into manageable components. This approach allows you to focus on a single technical responsibility in each microservice, as well as write, compile, deploy, scale and monitor the latter individually. Starting a new service and breaking it down into microservices is then a straightforward story which can also be done by less-experienced developers.

Top comments (0)