DEV Community

Discussion on: Quick and Easy, Exactly-Once, Distributed Work Queues Using Serializable Transactions

Collapse
 
ajwerner profile image
Andrew Werner

I feel like I should start by saying that this scheme is especially well suited to tasks where the processing time is much larger than the time it takes to acquire the lease. Hopefully that contextualizes some of the scalability concerns you raise, as they are generally valid.

  1. When performing a LIMIT 1 query here we'll always scan the primary index in its sorted order. As you pointed out, this will require going to at least one leaseholder and potentially many. Furthermore, it will get involved in contention. In the best case, concurrent requests to acquire will serialize in memory due to the SELECT FOR UPDATE read locks added in 20.1. The point of this design is not to optimize concurrent lease acquisition. We'll explore how to better optimize concurrency for a different set of assumptions below.

  2. I was imagining that in the scenario with the FIFO, to make it efficient, we'd either add the ordering condition to the primary key or add a secondary index. Perhaps the best would be to add a partial secondary index (new feature in 20.1) over unclaimed items. That way you'd at least only need to go to one leaseholder rather than all of them.

  3. We can split secondary index ranges so I don't think this is really a problem. It is the case that you'll be constantly contending to read and update that index which is likely to lead to serialization of concurrent requests. Fortunately the more pessimistic locking should lead to reasonable behavior here rather than the backoff retry loops that might have occurred in previous versions.

As I noted at the beginning, none of these scalability concerns really come up if the work being done for each task takes orders of magnitude longer than acquiring the lease. That was the envisioned use case for this article and I should have spelled that out more clearly. A different set of requirements would be for an event processing system that wants to process short events at high throughput. In that setting, any contention reading or processing events would be completely detrimental to scalability. It is also the case that in that world, one is likely to be willing to accept slightly different performance characteristics. In this set of requirements I'd encourage us to look towards other pub-sub systems like kafka. Kafka provides parallelism by statically sharding the topic. This means that each partition becomes a FIFO but the whole topic is no longer totally ordered. In the initial presentation of this article, we didn't have order, and so adding a level of coarse-grained locking above the work pool is easy to imagine and would help to avoid contention.

Let's for simplicity just talk about static partitioning. We could imagine later talking about dynamic partitioning but it'd be slightly more involved. Even this is going to appear somewhat kludgy given we're just talking pseudo-code but I hope you get the picture.

To avoid contention we're going to wait to add another layer of indirection whereby sessions grab locks on portions of the items table.

CREATE TABLE session_parts (
    id UUID REFERENCES sessions (id) ON DELETE CASCADE, part INT8,
    PRIMARY KEY (id, part)
);
Enter fullscreen mode Exit fullscreen mode
CREATE TABLE items (
    id UUID NOT NULL PRIMARY KEY, claim UUID, claim_part INT8,
    CONSTRAINT fk_session_parts
        FOREIGN KEY (claim, claim_part)
        REFERENCES session_parts (id, part)
        ON DELETE SET NULL
);
Enter fullscreen mode Exit fullscreen mode

So as written I'm leaving a lot of logic up to the client but I envision that we'd have the client configured to have a certain number of parts that should be claimed. Then the sessions will claim parts by keeping track of the set of existing sessions and then choosing the parts based on lexical order. For example, imagine there are 2 sessions and we want to split it into 8 parts.

INSERT INTO sessions VALUES ('b4b3722f-ac65-4495-afca-5883da1ff861', 'c11f8e9b-1d67-493a-98d1-a7a3f645ab2a');
Enter fullscreen mode Exit fullscreen mode

These two sessions would notice each other and the first would grab parts 0-3 and the second would grab parts 4-7. Then, each session would use its parts to break up the item keyspace. For example, for part 0 you might do:

UPDATE
    items
SET
    claim = $1, claim_part = $2
WHERE
    claim IS NULL
    AND id >= '00000000-0000-0000-0000-000000000000'
    AND id < '20000000-0000-0000-0000-000000000000'
LIMIT
    1
RETURNING
    id;
Enter fullscreen mode Exit fullscreen mode

Let's say a new session popped up. The other sessions would eventually detect that (let's say with polling though there could be other means). Then they'd determine the need to drop some parts.

INSERT INTO sessions VALUES ('c006dd7c-1880-40ce-a8e6-ac07aa22163d')
Enter fullscreen mode Exit fullscreen mode

In this case the breakdown would be so the first session would drop parts 2 and 3 and the second session would drop part 4.
(0,1), (2, 3, 4), (5, 6, 7)

That way each part's claims would be completely uncontented. The problem with this is that failover to pick up new claims and increased latency due to imbalance in the partitions. For what it's worth, this is exactly how kafka consumer groups work.

Hope this all made sense!