Let's imagine a scenario that one has a distributed application that interacts with a third party API. Usually, third party APIs have a rate-limit control mechanism in order to avoid their clients from bursting requests and causing down-time on their services. In such a scenario, how can the caller control the rate of outgoing requests to the third party API in a distributed environment? This post discuss a possible strategy for this problem.
There a multiple algorithms to control the rate of requests, but here we'll focus on the token bucket algorithm, because it is relatively easy to understand and to implement. This algorithm states that: a bucket can hold a maximum of T tokens, and when an application wants to make a request to the third party API, it has to take 1 token from the bucket. If the bucket is empty, it has to wait until there is a least 1 token in the bucket. Also, the bucket is refilled with 1 token at a fixed rate of R tokens/milliseconds.
The token bucket algorithm is very straightforward to understand, but how can someone use it in a distributed environment to control the outgoing request to third party APIs?
If one wants to control the outgoing rate limit in a distributed environment, a centralized source of truth for the current rate limit is necessary. There are multiple ways to implement the source of truth and I've idealized the following diagram with a possible implementation:
In the figure above, we have a distributed application in multiple pods, and each pod can make requests to a third party API. In the application infrastructure, there is a TCP server that controls the rate limit by using the token bucket algorithm. Before making a request to the third party API, the pod asks the TCP server for a new token, and the pod waits for a response from the TCP server until there is at least one available token. After a token is available, the pod makes the request to the third party API.
The TCP server implementation can be found in this repository https://github.com/rafaquelhodev/rlimit/ and in the next section I'll discuss briefly the token bucket implementation in golang.
Token bucket implementation
Below, I'm showing the main ideas behind the token bucket implementation. Please, take a look at the https://github.com/rafaquelhodev/rlimit/ repository to understand the detailed implementation.
The rate limit control is centralized in the TokenBucket
struct:
type TokenBucket struct {
id string
mu sync.Mutex
tokens int64
maxTokens int64
refillPeriod int64
cron chan bool
subs []chan bool
}
You can notice that there is a subs
property in the TokenBucket
struct. Basically, this is an array of subscribers for a specific token bucket: every time a token is requested from a client, the client is added to the subs
array and the client is notified when a new token is added to the bucket.
When starting the bucket, we need to provide a maximum number of tokens the bucket can support (maxTokens
) and the amount of time a token is added to the bucket (refillPeriod
):
func newTokenBucket(id string, maxTokens int64, refillPeriod int64) *TokenBucket {
bucket := &TokenBucket{
id: id,
tokens: 0,
maxTokens: maxTokens,
refillPeriod: refillPeriod,
cron: make(chan bool),
subs: make([]chan bool, 0),
}
fmt.Printf("refill period = %d\n", refillPeriod)
bucket.startCron()
return bucket
}
Now, you might wonder, "how a token is added to bucket?". For that, when a bucket is created, a cron job is started, and at every refillPeriod
milliseconds, a new token is added to the bucket:
func (tb *TokenBucket) startCron() {
ticker := time.NewTicker(time.Duration(tb.refillPeriod) * time.Millisecond)
go func() {
for {
select {
case <-tb.cron:
ticker.Stop()
return
case <-ticker.C:
if tb.tokens < tb.maxTokens {
tb.tokens += 1
fmt.Printf("[TOKEN REFIL] | currTokens = %d\n", tb.tokens)
if len(tb.subs) > 0 {
sub := tb.subs[0]
tb.subs = tb.subs[1:]
sub <- true
}
}
}
}
}()
}
Finally, when a client wants a token from the bucket, the waitAvailable
function must be called:
func (tb *TokenBucket) waitAvailable() bool {
tb.mu.Lock()
if tb.tokens > 0 {
fmt.Printf("[CONSUMING TOKEN] - id = %s\n", tb.id)
tb.tokens -= 1
tb.mu.Unlock()
return true
}
fmt.Printf("[WAITING TOKEN] - id %s\n", tb.id)
ch := tb.tokenSubscribe()
tb.mu.Unlock()
<-ch
fmt.Printf("[NEW TOKEN AVAILABLED] - id %s\n", tb.id)
tb.tokens -= 1
return true
}
Inspired by https://github.com/Mohamed-khattab/Token-bucket-rate-limiter
Top comments (0)