This is a dump of my learnings and experiments while going down a little rabbit hole.
Concurrent rate limiters
I was studying Sidekiq's page on rate limiters. The first type of rate limiting mentioned is the concurrent limiter: only n tasks are allowed to run at any point in time. Note that this is independent of time units (e.g. per second), or how long they take to run. The only limitation is the number of concurrent tasks/requests.
So I asked myself, how would I implement a concurrent rate limiter? I'm fairly familiar with locking (via Redis and the database, for instance), so that was what came to mind, but in its usual form, that only works as a mutex (number of allowed tasks, n = 1). I wasn't sure about how to implement that when n > 1. Decided to dig into it from first principles.
Concurrency control scenarios
In this case, that meant stepping back to think about concurrency control in general, and the scenarios I know of.
The first scenario is process-local: you have multiple threads within a process, and you want to ensure only n threads can access a resource at once. I already knew how to do this:
- when n = 1, use a mutex. Only the thread with the lock on the mutex can execute; others have to wait.
- when n > 1, use a semaphore. I wasn't too familiar with semaphores, so I decided to brush up.
Semaphores are similar to mutexes, but they are less about guaranteeing exclusive access and more about keeping track of who has access. A semaphore starts out with a fixed number of "permits". A thread can request a permit (similar to acquiring a lock), and that reduces the number of available permits. When all permits are in use, any requesting threads will have to wait until one is released. In this sense, a semaphore is kinda like a bouncer at a club—it regulates the number of people who can get in.
Semaphores via mutexes
There are many semaphore implementations available for Ruby. I decided to implement one myself. The key thing is that the semaphore governs access to a resource (the number of permits), so we need a way to ensure the semaphore can do this job safely. I used Ruby's native Mutex to achieve this:
class Semaphore
def initialize(max_permits)
@max_permits = max_permits
@used_permits = 0
@mutex = Mutex.new
end
def permit(&block)
acquire
block.call
release
end
private
def acquire
acquired = false
until acquired
@mutex.synchronize do
acquired = permit_acquired?
end
sleep 0.05 unless acquired
end
end
def permit_acquired?
if @used_permits < @max_permits
@used_permits += 1
return true
end
false
end
def release
@mutex.synchronize do
@used_permits -= 1 if @used_permits > 0
end
puts "#{@max_permits - @used_permits} permit(s) available"
end
end
Usage:
semaphore = Semaphore.new(2)
t1 = Thread.new do
semaphore.permit do
puts 'Thread 1 acquired semaphore'
sleep rand(1..3)
p "Thread 1 releasing"
end
end
t2 = Thread.new do
semaphore.permit do
puts 'Thread 2 acquired semaphore'
sleep rand(1..3)
p "Thread 2 releasing"
end
end
t3 = Thread.new do
semaphore.permit do
puts 'Thread 3 acquired semaphore'
sleep rand(1..3)
p "Thread 3 releasing"
end
end
[t1, t2, t3].map(&:join)
Sample output:
Thread 1 acquired semaphore
Thread 2 acquired semaphore
"Thread 2 releasing"
1 permit(s) available
Thread 3 acquired semaphore
"Thread 1 releasing"
1 permit(s) available
"Thread 3 releasing"
2 permit(s) available
This approach checks whether there are any available permits and returns one if so. Otherwise, it will sleep for 0.05 seconds and check again. The mutex guarantees that we can safely increment or decrement the number of permits without race conditions. This is a basic implementation, btw; one important thing missing is wait timeouts—we shouldn't have to wait forever.
Also note that there is no expiry on a permit—a client could get a permit and refuse to release it! Apparently, that's by design; semaphores don't control what you do with the permit. The onus is on you to be responsible with it.
Fair semaphores
This approach suffers from unfairness. Suppose thread A has been waiting for a permit to become available, and finally, another thread releases one. If at that moment, thread A is still sleeping, and a new thread (thread B) is launched, B might acquire the permit instead of A. In essence, an unlucky thread could wait for a very long time (or forever!) while newer threads get a permit. This is like if the bouncer selected people at random, instead of who's been waiting the longest.
Also, the constant sleeping and waking up (polling) is suboptimal. We're giving the Ruby interpreter and OS more work to do (constantly scheduling and waking up the thread) when there might not be actually any permits available, and we're potentially taking time away from threads which have actual work to do.
So I decided to try a different approach, using Ruby's Thread::Queue
. Thread::Queue
is a queue data structure designed for concurrent use; requesting an item (via #pop
) will either return an item if one is available, or block until another thread adds one to the queue (via #push
). So we model the list of permits as a queue (you can put anything you want in the queue, as long as it's the same number as the permits). Acquiring = popping, releasing = pushing.
class Semaphore
def initialize(max_permits)
@permits = Thread::Queue.new([1] * max_permits)
end
def permit(&block)
acquire
block.call
release
end
private
def acquire
@permits.pop
end
def release
@permits.push(1)
puts "#{@permits.size} permit(s) available"
end
end
This works too (and the code is also much shorter). I'm not a 100% certain the waiting threads are served in order of who asked first (the docs don't say exactly that), but I think that's the case.
Condition variables
After this, I took a look at the semaphore class in the popular library, concurrent-ruby to see how they implement it, and I learnt about something new: condition variables. And Ruby comes with this included!
The name sounds super technical, but it's quite approachable in reality: a condition variable lets you tell other threads waiting on a resource that it's now available. It's meant to be used with a mutex. Instead of having the thread constantly poll, as in my initial implementation, the thread sleeps forever (or with a timeout), and gets woken up by the condition variable when a new permit is available. Here's the new implementation:
class Semaphore
def initialize(max_permits)
@max_permits = max_permits
@used_permits = 0
@mutex = Mutex.new
@condition_variable = ConditionVariable.new
end
def permit(&block)
acquire
block.call
release
end
def acquire
@mutex.synchronize do
until permit_acquired?
@condition_variable.wait(@mutex)
end
end
end
def permit_acquired?
if @used_permits < @max_permits
@used_permits += 1
return true
end
false
end
def release
@mutex.synchronize do
@used_permits -= 1 if @used_permits > 0
end
@condition_variable.signal
puts "#{@max_permits - @used_permits} permit(s) available"
end
end
Rather than polling (sleep-wake-check-sleep), we just sleep (@condition_variable.wait
), and then when another thread is done, they call @condition_variable.signal
, which will wake up the first waiting thread (so it's fair, yay). It reminds me a bit of events in JavaScript.
Distributed concurrency control
Okay, good diversion; now, back to concurrent scenarios. We've looked at process-local scenarios. The second scenario is system-local, but separate processes. This is, for example, when you have multiple web server processes running on the same machine, and you want to control access to a shared resource.
Processes don't share memory, so our mutex/semaphore can't live inside any single process. We have to use an external datastore, such as a cache (eg Redis), or a database (PostgreSQL). You could even use a file or what-have-you.
A third scenario is in a distributed system: the processes are on separate machines. This is an extension of the above case, so the same approaches as above apply, but obviously in a distributed way (the datastore has to live on an external location all the processes can access).
Okay, so how could we implement mutexes (n = 1) here?
Aside: things to keep in mind
When dealing with locks/mutexes, you want to avoid starvation (ie, an unruly/crashed client holding on to the lock and blocking everyone else). A common (but imperfect) way to avoid this is to set a lock expiry (how long a given client can hold a lock). This way, if a client is unable to release the lock (for instance, if they crashed), it will automatically be released after a while.
You also want to limit contention, typically by setting a wait timeout (how long a client should wait for when another client is using the lock). If you don't set a wait timeout, you could end up with other processes hanging forever because a lock is in use. Sometimes that might be desired, but more likely you probably want to quit and try again later. Either way, you should decide on your wait timeout policy for your clients.
Mutex with Redis
A common pattern is to set a key in Redis, something like SET some-lock-key some-value NX EX expiry-in-seconds
.
-
NX
will set the key only if it doesn't already exists. If the key already exists, it means another process has the lock, and you need to retry. -
EX
(orPX
) sets a time when the lock expires, so a crashed process doesn't keep on hanging on to the lock - To release the lock, you can use
DEL
to delete the key. (But you shouldn't! See below.)
This works, but has a few flaws:
- Processes need to poll Redis until they get the lock, or give up. We already saw why polling is suboptimal.
- Lock expiry is, at best, a guess. You're hoping all your clients will finish in that time. But if one process somehow doesn't finish in time, the lock would erroneously expire, and you could end up with two concurrent processes (!)
- If the above happens, and the first process tries to release the lock with
DEL
, it would delete the lock now held by the second process. The Redis docs have details on how to correctly delete a lock. - In a distributed Redis cluster, a lock could be acquired multiple times. Distributed locking is probably a topic for another day, though. Patterns like Redlock are suggested (but also criticized).
Mutex with PostgreSQL
For SQL, I like Postgres' advisory locks. Running SELECT pg_advisory_lock(123)
will give this client a lock called 123
, and all other clients who run that same statement will have to wait until the first client releases the lock.
With Postgres' advisory locks, you don't need a lock expiry, since the lock is bound to the session or transaction. If the client crashes, PG will release the lock. Wait timeouts aren't directly supported, but you can achieve this by using the lock_timeout
setting (combined with transaction-level locks if you don't want global wait timeouts):
BEGIN; -- Start transaction
SET LOCAL lock_timeout = '10s'; -- Error if no lock acquired after 10s
SELECT pg_advisory_xact_lock(123); -- Get a transaction-level lock
-- Execute the rest of your code
COMMIT; -- End transaction; lock is released automatically
MySQL also has advisory locks (although they are not transaction-bound) and wait timeouts:
SELECT GET_LOCK('my_lock', 10); -- Error if no lock acquired after 10s
SELECT RELEASE_LOCK('my_lock');
I love Redis, but I think I prefer SQL databases for mutexes. Since the Redis API does not expose any concept of a lock, we try to emulate it with the SET NX
pattern or more complicated algorithms, which is why we have to do the "lock expiry" dance. PostgreSQL, on the other hand, has locks as a first-class function, which means it can provide better guarantees, such as the fact that locks will always be released when the session ends.
Semaphore with Redis
How about semaphores (n > 1)? At first I was thinking of something using Redis transactions (MULTI
) and WATCH
, something like this (not valid code):
WATCH semaphore-permits-used
max = GET semaphore-permits-max
used = GET semaphore-permits-used
return unless used < max # No permits available
MULTI
INCR semaphore-permits-used
EXEC
WATCH
will fail the transaction if the semaphore-permits-used
is modified by another client, so this serves as a mutex for the permits. But this implementation seems pretty complex to me; it involves us switching between our app code and Redis multiple times (or making this a Redis script). i haven't had the chance to try it yet, though.
Turns out, you can implement a semaphore in Redis quite simply with blocking list operations (akin to what we did with Thread::Queue
in Ruby):
- First, put n items in a list in Redis (say
semaphore-available-permits
) - To acquire a permit, call
BLPOP semaphore-available-permits
. This will pop one item from the list. If there's none available, it will block until some other client pushes one. You can also specify a wait timeout:BLPOP semaphore-available-permits <wait-timeout>
. - To release a permit, call
RPUSH semaphore-available-permits
. If there are clients waiting for a permit, the longest waiting client will automatically get the newly released permit (so it's a fair semaphore).
It's still a bit ugly, because that first step is crucial (otherwise clients would wait forever). The best approach there is to either have each client check if the semaphore has been initialized (e.g. by checking if a certain key exists), and initialize it themselves if not; alternatively, you could have an explicit initialization step that creates the permits when your app starts up.
Semaphore with Postgres
Unfortunately, advisory locks don't help here. We need a good ol' database table to keep track of our semaphores.
CREATE TABLE global_semaphores (
semaphore_name TEXT,
max_permits INTEGER,
used_permits INTEGER DEFAULT 0,
)
To kick things off, we create the semaphore with capacity n = 4:
INSERT INTO global_semaphores ("my_semaphore", 4)
Checking out and releasing a permit is straightforward: update the used_permits
count. But, to ensure exclusive access, we must use a transaction:
UPDATE global_semaphores
SET used_permits = used_permits + 1
WHERE semaphore_name = "my_semaphore"
AND used_permits < max_permits;
The UPDATE
statement will automatically lock the matching row (our semaphore) until it finishes executing, so no transaction needed (unless we want to specify a local timeout). All we need to do is check if the row was updated; if so, we have our permit.
To release the permit, it's the reverse:
UPDATE global_semaphores
SET used_permits = used_permits - 1
WHERE semaphore_name = "my_semaphore";
One downside here is that there's no easy way to block while waiting for a new permit to be available. We'll have to rely on polling.
I find it quite interesting the differences in approach between Redis and PG here: an explicit list of permit items (Redis) vs a counter (Postgres). I think you could use either approach in both, but it would be more complicated. (I actually had an initial implementation in Postgres that used multiple rows and transactional isolation, but it was def more complex.) Postgres' transactional guarantees make it easier to work with a single counter (= a single row), while Redis' list data structure and blocking options make that approach straightforward.
Putting it all together: concurrent rate limiter
We're almost there! Actually, we're there. A concurrent rate limiter is essentially a semaphore. The max number of permits = the max number of concurrent tasks.
However, the Sidekiq version also includes lock expiry, so I spent some time thinking about it. My conclusion: there's no "nice" way to do it. The permit expiry approach I could think of (or find in the wild) was:
- When a client gets a permit successfully, it must record that permit and its acquisition time (in a hash in Redis, or a row in a table in Postgres)
- When the client releases the permit, it can then delete the hash entry or row
- You either have an external process that regularly checks for permits that are too old and force-releases them, or have a newly-connecting client do that check themselves.
I think that's it for that exploration. It was quite interesting racking my brain about semaphores and guarantees. My current conclusions are that I'd prefer to use Postgres for mutexes and Redis for semaphores. It was also a revelation that semaphores don't provide any guarantee of expiry, so you must program carefully around that.
Top comments (2)
I built a startup a while back called ratelim.it and this was what we provided. The way we though about these is that these are all rate limits, the semaphore use case is just a "returnable" limit. This is pretty similar to what you're saying / sort of just different terminology. But it did impact how I built it / thinking about cacheability & clearing.
I'm considering offering this again as a service at Prefab. I'd love input on whether that's a service people would find useful.
I really enjoyed the level of depth put in this article, not only showing different locking mechanisms in code but also applying them to practical real world applications. Some of the points you mentioned does remind me of AWS SQS which has visibility time outs for messages. That way if a task fails to complete it becomes re-visible again in the queue for another worker to pick it up again.