Coordinating work across multiple processes almost always pops up in one form or another when building applications. There are well-known recipes to perform this coordination on top of both distributed coordination services like etcd or Apache Zookeeper/Curator, and single-machine SQL databases like Postgres. Given the strong consistency of CockroachDB, similar patterns always seemed possible but weren't well documented. This short post tries to provide a recipe to achieve correct, fault-tolerant, distributed coordination on top of CockroachDB.
I want to clarify that providing true execute-exactly-once semantics or mutual exclusion for work outside of the coordinating storage system is impossible to provide in a fault-tolerant, highly available way (see Section 3.2 in the Jepson etcd evaluation). In an arbitrarily asynchronous network where nodes may pause for unlimited amounts of time, we can’t know whether an operation is failed or slow.
Fortunately, we can get pretty close by making some reasonable assumptions about failure detection. Secondly, we can provide a different guarantee: exactly once acknowledgement. With this guarantee, only one task will ever be able to successfully acknowledge completion of some work item. If the completion of the task means writing a result to the database, this means we can achieve exactly-once semantics 🎉.
The key concept that enables coordination over work items is the session. The session encapsulates coordinated failure detection. The system will refer to sessions when attempting to perform work. Work items can only be successfully consumed by workers which have a live session throughout the entire processing of a work item.
Imagine we have work items that we want to acknowledge having worked on exactly once. To do this we’ll need two tables, a sessions table and an items table. The trick is that we’re going to use the sessions table to track the liveness of our workers. Each worker will heartbeat its session to indicate its liveness. Workers will lock work items prior to processing by associating the item with their session.
We’ll have workers do 3 things continuously:
1) Heartbeat the
sessions table to keep their session alive.
2) Poll the
sessions table to see if there are any expired sessions which need to be deleted.
3) Poll the
items table to claim work items.
For each work item which is claimed, we'll then process the work item, then run a transaction which records its outcome and removes it from the queue. The code will ensure, transactionally, that the session under which the item was claimed is still valid when recording the result.
CREATE TABLE sessions ( id UUID DEFAULT gen_random_uuid() PRIMARY KEY, heartbeated_at TIMESTAMPTZ NOT NULL -- Probably also a good idea to put some metadata about the session -- e.g. the hostname and pid which owns the session and maybe the -- timestamp at which it was created. );
When a process starts up, it will need to create a session:
INSERT INTO sessions (heartbeated_at) VALUES (now()) RETURNING id
Then, the session will need to kick off a thread to periodically update the
heartbeated_at timestamp with a new value of now. This should happen generally quite a bit more frequently than the expiration (which you’ll also need to pick).
-- Here $1 is the session ID for the process. UPDATE sessions SET heartbeated_at = now() WHERE id = $1
If the process fails to heartbeat, likely because your session has expired, then the code should exit the worker process as it won’t be able to acknowledge any work that is currently claimed. This is the case where the system thinks your worker session has failed.
We want to ensure that live sessions are eventually able to claim work items that had previously been claimed by now dead sessions. We do this by detecting expired sessions and deleting them.
-- Here $1 is the chosen expiration as an INTERVAL. DELETE FROM sessions WHERE heartbeated_at < now() - $1
Reasonable values for the heartbeat interval and expiration might be 1 and 5 seconds respectively. With these settings, it would take 5 seconds for a failed node's items to be claimed by another node. Note that during graceful shutdown, the process can remove its session cooperatively, before it has expired, meaning work can be claimed immediately.
We'll need an items table to store work items. We could use an index to order the items or just grab them arbitrarily.
CREATE TABLE items ( id UUID NOT NULL PRIMARY KEY, claim UUID REFERENCES sessions (id) ON DELETE SET NULL -- other fields related to score, details of the item, etc. -- You could use score to prioritize item. -- Maybe time added and then order by that? -- Also you could create an index. );
In our work claiming loop, while we have capacity to grab outstanding work items, we'll ask the database for a new item by periodically polling. The polling rate should be chosen based on the latency requirements of the application. Another consideration is the total number of worker nodes. If more nodes are polling for items, consider setting the polling rate to be less frequent. Something on the order of
100-200ms seems pretty reasonable for lots of applications with 10's of worker nodes.
We'll use the below query to find a work item and claim it. We could add an
ORDER BY clause to prioritize work items based on properties.
-- Here $1 is the process's session ID. UPDATE items SET claim = $1 WHERE claim IS NULL LIMIT 1 RETURNING id
Here’s the magic to achieve exactly once semantics: in the processing of this item, you will use a transaction that records the outcome of the item and also deletes the item from the queue. It’s critical that when that transaction runs, it ensures that the process’s session still has the claim on the item. That looks like:
-- do some processing of the item. BEGIN; --- ... Record the results of processing the item. -- Here $1 is the item id and $2 is the process's session. DELETE FROM items WHERE id = $1 AND claim = $2; -- SQL clients give access to the number of rows affected by a statement. -- Ensure that the DELETE affected 1 row. If it didn't, then your session -- has lost its claim and the transaction needs to be aborted with a -- ROLLBACK. Otherwise, proceed to COMMIT. COMMIT;
If the above transaction commits, because of the properties of serializable isolation and transactions, you’ll be sure that the item was processed exactly once!