Welcome to the third and final part of my series on replication in distributed systems. If you have missed the first two parts of the series, I would advise you to read them first, as this comes off as a storyline. Click here and here for the previous blogs. In this blog, we will discuss... well, let's dive right in without formalities!
Leaderless Replication
In a leaderless implementation, the client directly sends the writes to several replicas and waits for the majority of them to acknowledge, compared to a setup with one or more leaders where a leader exists to coordinate the writes.
What happens when a node goes down?
In a leader-based configuration with one leader and two replicas, we need to perform failover if we want to continue to process writes. But in a leaderless configuration, failover doesn't exist. So when the unavailable node comes back online, it has missing writes, meaning it has stale data.
This problem can easily be solved by sending the read request from the client to not just one replica but to several nodes in parallel. The client may receive different versions of the data that may be up-to-date or stale. Version numbers are used to determine which value is newer.
When the unavailable node comes back online, it has stale data. There are two mechanisms to catch up on the writes
- Read Repair - When a client makes a read request from several nodes in parallel, it can detect the node/s having stale data. The client sees the node which has stale data and writes back the newer value. This approach works well for values frequently read.
- Anti-Entropy - A background process runs constantly to look for differences in the data between the replicas. This approach doesn't copy writes in the same order they are written to the up-to-date replica
Quorums Consistency
Quorums aren't necessarily the majority -- the only thing that matters is that the sets of the nodes used by the read and write operations overlap in at least one node. If there are n replicas, every write must be confirmed by w nodes to be considered successful, and we must query at least r nodes for each read. As long as w + r > n, we expect to get up-to-date values.
Reads and writes that obey these r and w values are called quorum reads and writes. The quorum condition w + r > n allows the system to tolerate unavailable nodes
- if w < n, we can still process writes if a node is unavailable
- if r < n, we can still process reads if a node is unavailable
But even with quorum consistency, there are edge cases where stale values returned
- If a sloppy quorum is used, the w writes can end up on different nodes than the r reads, so there is no longer a guaranteed overlap between the r and w nodes.
- If two writes occur concurrently, it's not clear which one happened first. The only safe solution is to merge the concurrent writes. If a winner is picked based on the timestamp (LWW - last write wins), writes can be lost due to clock skew.
- If a write happens concurrently with a read, the write may be reflected on only some of the replicas. It's undetermined whether the reads return the old or new value.
- If the write succeeds on fewer than w nodes and fails on the remaining ones, it is not rolled back on the replicas where it succeeded. It means that if a write is reported as failed, subsequent reads may or may not return the value from that write.
Sloppy Quorums & Hinted Handoff
In large clusters with significantly large n nodes, the client can likely connect to some database nodes during the network interruption, just not the nodes that needed to assemble a quorum for a particular write or read. Is it better to return errors to all requests when we can't reach a quorum, or should we accept the writes anyway and write them to some nodes that are reachable, but aren't among the n nodes on which the value lives?
This is called sloppy quorums. Writes and reads still require w and r successful responses, but those may include nodes that aren't among the designated n "home" nodes for a value. Once the network interruption is fixed, any writes that one node temporarily accepted on behalf of another node are sent to the appropriate "home" nodes. It is called hinted handoff.
Sloppy quorums are particularly useful for increasing write availability. As long as any w nodes are available, the database can accept the writes. However, this means that even when w + r > n, you cannot be sure to read the latest value for a key because the latest value may have been temporarily written to some nodes outside of n.
Last Write Wins (LWW)
Leaderless replication is designed to tolerate conflicting concurrent writes. One approach to achieving eventual convergence is to declare that each replica needs only to store the most "recent" value and allow "older" values to be overwritten. We can attach a timestamp to each write, pick the biggest timestamp as the most "recent" value, and discard any writes with an earlier timestamp. This conflict resolution algorithm is called last write wins.
LWW achieves the goal of eventual convergence but at the cost of durability. If there are several concurrent writes to the same key, even if they were all reported to be successful by the quorum to the client, only one of the writes will be able to survive, and the others will be silently discarded. It may cause LWW to drop writes that aren't concurrent. Caching is one use case where lost writes are acceptable, and LWW is a good choice. But if losing data isn't acceptable, then LWW is a poor choice for conflict resolution.
Defining Concurrency & "Happens-Before" Relationship
An operation A happens before another operation B if B knows about A, B depends on A, or B builds upon A. The key to defining concurrency is to know whether one operation happens before another operation. In concurrency, the timestamp doesn't matter. We call two operations concurrent if they are unaware of each other, regardless of the physical time at which they occurred.
Capturing the Happens-Before Relationship
The server can determine whether two operations are concurrent by looking at the version numbers.
- The server maintains the version number for every key and increments the version number every time that key is written. The server stores the new version along with the value written.
- When a client reads a key, the server returns all the values that haven't been overwritten with the latest version number. A client must read the key before writing.
- When a client writes a key, it must include the version number from the prior read and must merge all the values it received in the prior read.
- When the server receives a write with a particular version number, it can overwrite all values with that version number or below, but it must keep all values with a higher version number.
Version Vectors
Using a single version number to capture dependencies between operations isn't sufficient when multiple replicas are accepting writes concurrently. Instead, we need to use a version number per replica per key. Each replica increments its own version number when processing a write and keeps track of the version numbers it has seen from each of the other replicas. This information indicates which values to overwrite and which values to keep as siblings (concurrent values are called siblings). The collection of version numbers from all replicas is called version vector.
Version vectors are sent from the database replicas to clients when values are read and need to be sent back to the database when a value is subsequently written. It helps the database to distinguish between overwrites and concurrent writes.
Parting Words
The third part and final part of the blog series introduced quorum consistency for leaderless replication, the LWW concurrency model, and version vectors. This rounds up my blog series on replication in distributed systems.
If you liked the blog, don't forget to share it and follow me on Twitter, LinkedIn, Peerlist, and other social platforms, and tag me. You may also write to me at guptaamanthan01@gmail.com to share your thoughts about the blog.
References
- Designing Data-Intensive Applications: The Big Ideas Behind Reliable, Scalable, and Maintainable Systems by Martin Kleppmann
Top comments (0)