DEV Community


Discussion on: Quick and Easy, Exactly-Once, Distributed Work Queues Using Serializable Transactions

georgysavva profile image
Georgy Savva • Edited

Thanks a lot for spotting a light on this interesting topic!
I have a few questions about scalability and indexes in this type of workflow. Maybe you could help me figure them out.

  1. Let's imagine that we have our queue (cockroachDB) and workers handling a throughput of N tasks per second. For the sake of this example, items table has only 1 index on the id field (primary key).
    And now load increases and we need to handle 4N tasks per second. Scaling workers is simple, I just increase the number of workers by 4 and it's fine. I also should increase the number of cockroachdb nodes by 4 to scale the queue part as well. It will work perfectly when we put to the queue because we evenly distributed requests based on item id to a single leaseholder. But I am a bit concern about when we get from the queue, i.e. this query:

    UPDATE items SET claim = $1 WHERE claim IS NULL LIMIT 1 RETURNING id

    Does LIMIT 1 mean that cockroachdb can serve data from the gateway node if it contains the suitable record and doesn't need to call all other leaseholders in the cluster? If yes, it's great. Because as far as I understand without LIMIT 1 cockroachdb will need to search on all leaseholders i.e. involve all nodes and increasing the number of nodes by 4 won't scale this part very well. Or I missing something?

  2. You were mentioning that we can add a priority for items. Let's imagine items table have created_at timestamp field. And it still has only 1 index on the id field.
    We want our queue to act as FIFO, i.e. we get items with the lowest created_at:

    UPDATE items SET claim = $1 WHERE claim IS NULL ORDER BY created_at LIMIT 1 RETURNING id

    Does it mean that, again, cocroachdb needs to get items from all leasholders, to pick the one with the lowest created_at and hence involve all nodes in the cluster? Is it linearly scalable with the number of nodes?

  3. Let's assume that we do have an index on the claim field in the items table. All new records that we insert into the items table have NULL in the claim field. Doesn't it mean that we always insert into the same range, like when we have an index on an always increasing field? Can it become a bottleneck? And what happens if this single range for NULL value exceeds the limit (64 MB), can it be split?

ajwerner profile image
Andrew Werner Author

I feel like I should start by saying that this scheme is especially well suited to tasks where the processing time is much larger than the time it takes to acquire the lease. Hopefully that contextualizes some of the scalability concerns you raise, as they are generally valid.

  1. When performing a LIMIT 1 query here we'll always scan the primary index in its sorted order. As you pointed out, this will require going to at least one leaseholder and potentially many. Furthermore, it will get involved in contention. In the best case, concurrent requests to acquire will serialize in memory due to the SELECT FOR UPDATE read locks added in 20.1. The point of this design is not to optimize concurrent lease acquisition. We'll explore how to better optimize concurrency for a different set of assumptions below.

  2. I was imagining that in the scenario with the FIFO, to make it efficient, we'd either add the ordering condition to the primary key or add a secondary index. Perhaps the best would be to add a partial secondary index (new feature in 20.1) over unclaimed items. That way you'd at least only need to go to one leaseholder rather than all of them.

  3. We can split secondary index ranges so I don't think this is really a problem. It is the case that you'll be constantly contending to read and update that index which is likely to lead to serialization of concurrent requests. Fortunately the more pessimistic locking should lead to reasonable behavior here rather than the backoff retry loops that might have occurred in previous versions.

As I noted at the beginning, none of these scalability concerns really come up if the work being done for each task takes orders of magnitude longer than acquiring the lease. That was the envisioned use case for this article and I should have spelled that out more clearly. A different set of requirements would be for an event processing system that wants to process short events at high throughput. In that setting, any contention reading or processing events would be completely detrimental to scalability. It is also the case that in that world, one is likely to be willing to accept slightly different performance characteristics. In this set of requirements I'd encourage us to look towards other pub-sub systems like kafka. Kafka provides parallelism by statically sharding the topic. This means that each partition becomes a FIFO but the whole topic is no longer totally ordered. In the initial presentation of this article, we didn't have order, and so adding a level of coarse-grained locking above the work pool is easy to imagine and would help to avoid contention.

Let's for simplicity just talk about static partitioning. We could imagine later talking about dynamic partitioning but it'd be slightly more involved. Even this is going to appear somewhat kludgy given we're just talking pseudo-code but I hope you get the picture.

To avoid contention we're going to wait to add another layer of indirection whereby sessions grab locks on portions of the items table.

CREATE TABLE session_parts (
    id UUID REFERENCES sessions (id) ON DELETE CASCADE, part INT8,
    PRIMARY KEY (id, part)
Enter fullscreen mode Exit fullscreen mode
    id UUID NOT NULL PRIMARY KEY, claim UUID, claim_part INT8,
    CONSTRAINT fk_session_parts
        FOREIGN KEY (claim, claim_part)
        REFERENCES session_parts (id, part)
Enter fullscreen mode Exit fullscreen mode

So as written I'm leaving a lot of logic up to the client but I envision that we'd have the client configured to have a certain number of parts that should be claimed. Then the sessions will claim parts by keeping track of the set of existing sessions and then choosing the parts based on lexical order. For example, imagine there are 2 sessions and we want to split it into 8 parts.

INSERT INTO sessions VALUES ('b4b3722f-ac65-4495-afca-5883da1ff861', 'c11f8e9b-1d67-493a-98d1-a7a3f645ab2a');
Enter fullscreen mode Exit fullscreen mode

These two sessions would notice each other and the first would grab parts 0-3 and the second would grab parts 4-7. Then, each session would use its parts to break up the item keyspace. For example, for part 0 you might do:

    claim = $1, claim_part = $2
    claim IS NULL
    AND id >= '00000000-0000-0000-0000-000000000000'
    AND id < '20000000-0000-0000-0000-000000000000'
Enter fullscreen mode Exit fullscreen mode

Let's say a new session popped up. The other sessions would eventually detect that (let's say with polling though there could be other means). Then they'd determine the need to drop some parts.

INSERT INTO sessions VALUES ('c006dd7c-1880-40ce-a8e6-ac07aa22163d')
Enter fullscreen mode Exit fullscreen mode

In this case the breakdown would be so the first session would drop parts 2 and 3 and the second session would drop part 4.
(0,1), (2, 3, 4), (5, 6, 7)

That way each part's claims would be completely uncontented. The problem with this is that failover to pick up new claims and increased latency due to imbalance in the partitions. For what it's worth, this is exactly how kafka consumer groups work.

Hope this all made sense!