DEV Community

lostghost
lostghost

Posted on

Distributed Applications. Part 2 - Distributed Design Patterns

This blog is part of a series on designing distributed applications. In this chapter, we will examine the different design patterns, commonly employed in the industry for distributed applications.

First let's address a widely heard, but poorly understood concept - microservices. What is a microservice, and how is it useful? For some reason, the term came to mean "your program should be many small programs, which send JSON over HTTP". What makes a small program? Why JSON and HTTP specifically? Seems like some sort of cargo cult.

A microservice is a service owned as completely as possible by one team, that can be deployed without coordination with any other team. Microservices are about office politics (namely, Conway's law), and not any particular technical solution.

With that out of the way, let's discuss CQRS, a widely adopted and useful pattern. CQRS stands for "Command-Query Responsibility Segregation", and it means that interations with a software system should be separated into commands and queries. Commands change the state of the system, and produce no results, while queries produce results, but don't change the state. This has many implications.

Commands require special handling - once accepted, they have to be processed, and in doing so, their effect should only be applied once. The first part means that a command is durably recorded under a unique ID, the second part means that a command is either carefully managed under that ID, or is idempotent.

Queries require much less handling - they don't need to be recorded and then executed, they can be executed right away, and have their results returned on the same connection. If the connection breaks - they can be retried with no worries, since they are idempotent by their nature.

And lastly, both commands and queries are in themselves stateless. State lives in a higher-level container called "workflow", consisting of commands and queries, that is responsible for the entire business-relevant operation, and exists either in a dedicated piece of software, or is scattered across different services.

Let us now go over widespread communication patterns. They aren't as relevant, if you are only making the "stateless compute" part of a distributed system yourself, which requires no coordination among itself, and rely on the database for all state handling. But if you do need reliable communication over the network, how do you arrange for it?

Direct RPC is a good option - with your own framing over TCP, or even your own reliability over UDP. Maybe JSON over HTTP. Either way, you decide if state lives only within one session - then you can put the other end behind a load balancer, but you lose the benefit of data locality - or if state lives between sessions - then you need to expose all IPs and ports, and worry about correct failover.

But direct RPC requires a degree of availability - at least one instance of every service should be always available. If that constraint isn't met, you need a sort of "socket activation" intermediary, to hold on to messages. Further, with direct RPC, every service needs to durably record commands that were sent, to their respective database - what if they were recorded in a shared database? This brings us to queues and message brokers. They aren't strictly better than direct RPC, and create a single point of failure, but they are a solid option, widely adopted in the industry.

Both direct RPC and queues commonly have at-least-once delivery guarantees - if reception of a message could not be confirmed, the message will be resent. This means, commands need to be idempotent.

Let's now scale this out to multiple independent services (possibly owned and operated by different teams). What if you need to perform an operation, that spans multiple services? Check for a condition in two services, and then perform a command on two services. But there can be a race condition - while you are checking the second service, the condition in the first service may become false. You need a transaction. There are two options.

First is a distributed transaction, with a transaction manager, and the OpenXA protocol, with two-phase commit. Issue is that if you have no control over a third-party service - you can't force them to adopt the transaction manager. And they may not want to let you lock their database.

The second option is the Saga pattern. Check the condition on the first service, perform an operation, and check the condition again. If the result is not what you want - rollback the operation, with a compensating operation, that should be paired with any positive operation. You no longer need to stop the world to coordinate different services - letting go of the illusion of control gives better performance.

Issue is, if you have a complex business process, that spans multiple services - the code of it may also get scattered across those services, and the whole process will be hard to make sense of, especially when things go wrong. One thing you should consider doing, is centralised logging and tracing, to follow the request between services. Second, factor out the process into a separate service, so the code is all in one place - keep the other services dumb. Third - you may want to have a centralised dashboard for logs and traces for all operations, with the option of canceling or restarting any operation - a workflow engine like Temporal can help in this case. Fourth, you may want to allow non-technical people to program and observe workflows at a high-level - a BPMN engine like Camunda and Flowable is a good option.

Another pattern is eventual consistency. Programs rely on forward progress, on monotonically increasing time and instruction pointer. Programs shouldn't "jump backwards" - otherwise logic, and cause and effect breaks. This won't happen as much, if you have stateful and sticky sessions - the same node will be receiving reads and writes. But what if you use stateless backends behind a loadbalancer, and separate read replicas? It may take time for writes to propagate to where you're reading from - so you won't see the effects of your own commands. There are two options - either specify "wait for propagation" in all of your commands, or accept eventual consistency, by the way of versioning:

  • Give every entity a version
  • Show the user the entity, remember its version N
  • If the user updates the entity - specify in the command "if version is still N", and additionally increment the version (in the same command, atomically)
  • If the version is no longer N - it was updated by somebody else - either check with business logic that the update still makes sense, or ask the user again, based on the new data

Many databases allow you to specify conditions in the update - if you know that the user made an update based on not the whole data, but specific conditions - you can avoid versioning, and include the conditions in the command (if the price is still under 100$, order the phone - no need to ask the user again because the price changed, since the price went down)

Another thing to consider is the sharding of your backend.

At a high level, the unit of sharding is either user request, or application entity. Per user request means that your entities are passive stores of data - and coordination of their manipulation will require locking. But if application entities are living participants in the communication, you get the actor model - which does not need locks.

At the low level, your application can be single-threaded, multi-process, multi-threaded, async with coroutines, async with virtual threads. Consider that Linux tracks memory (and performs OOM kill) per-process, and that async coroutines introduce function coloring.

And lastly, Event Sourcing. Event Sourcing is the observation that the state of your application is the result of processing a series of inputs, and the conclusion of separating the state into "source of truth" which is the append-only event log, and many "read-friendly" representations of it. Journaled file-systems, journaled databases are examples of event sourcing - sending the database WAL file to a separate analytical database is the example of creating a new read-friendly representation for a particular need. The downside is that you need to evolve (and version) multiple sets of entities - the ones in the event log, and the ones in every read representation - this can quickly become overwhelming. The upside is that you can destroy any read-friendly representation - it can always be recreated from the event log.

This concludes our exploration of application-level design patterns for distributed systems. Let me know if I missed any! In the next blog we will discuss the internal organisation and tradeoffs of different databases. Thank for reading

Top comments (0)