DEV Community

Leon Radley
Leon Radley

Posted on • Originally published at leon.id on

RxJS Global Rate Limit

RxJS Global Rate Limit

At work we are building an integration against a third party API that has a rate limit of 200 requests per minute. The API was paginated and the response did not contain all data, so a request to each item was required.

The following code will show you how this could be solved using rxjs.

Lets get started!

import {
  BehaviorSubject,
  filter,
  map,
  mergeMap,
  MonoTypeOperatorFunction,
  Observable,
  take,
  timer,
} from 'rxjs'

// global variables
let tokens = 200
let slidingWindowTime = 60_000
let tokenChangedSubject = new BehaviorSubject(tokens)

/**
 * Rate limit observable to x number of requests withing the slidingWindowTime
 *
 * @param parallel number of requests to run in parallel
 * @param slidingWindowTime time in milliseconds for how many requests can be run within
 * @returns rate limited observable
 */
export function globalRateLimit<T>(setOptions?: {
  parallel: number
  slidingWindowTime: number
}): MonoTypeOperatorFunction<T> {
  // initialize at first or at options reset
  if (setOptions) {
    tokens = parallel ?? 200
    slidingWindowTime = setOptions.slidingWindowTime ?? 60_000
    tokenChangedSubject.next(tokens)
  }

  const consumeToken = () => tokenChangedSubject.next(--tokens)
  const renewToken = () => tokenChangedSubject.next(++tokens)
  const availableTokens = tokenChangedSubject.pipe(filter(() => tokens > 0))

  return mergeMap<T, Observable<T>>((value: T) =>
    availableTokens.pipe(
      take(1),
      map(() => {
        consumeToken()
        timer(slidingWindowTime).subscribe(renewToken)
        return value
      }),
    ),
  )
}

Enter fullscreen mode Exit fullscreen mode

How to use it

const source$ = of(1, 2, 3, 4, 5, 6)
source$
  .pipe(globalRateLimit({ parallel: 2, slidingWindowTime: 1000 }))
  .subscribe((v) => console.log(v))

// output
// 1
// 2
// wait for 1000ms
// 3
// 4
// wait for 1000ms
// 5
// 6
// complete

Enter fullscreen mode Exit fullscreen mode

What good does it do having it as a global rate limit?

Because if you have multiple observable streams that both need to listen to the same limit, you can in the first call configure the limit, and in subsequent calls just use the pipe.

interface Page {
  page: number
  totalPages: number
  items: number[]
}
const pageRequest = (page: number): Observable<Page> => {
  return of({
    page,
    totalPages: 10,
    items: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
  }).pipe(
    // the page requests should be rate limited
    globalRateLimit({ parallel: 2, slidingWindowTime: 1000 }),
  )
}

// a stream of page responses, where we comply to the rate limit
const pages$ = pageRequest(1).pipe(
  expand((response) =>
    response.page < response.totalPages ? pageRequest(response.page + 1) : EMPTY,
  ),
)

// now we want a stream of all items in the pages
// so we flatten the page.items into a stream of items
const items$ = pages$.pipe(concatMap((page) => from(page.items)))

// now we want to call the api and get each item and still keep to the rate limit
interface Item {
  id: number
  name: string
}
const itemRequest = (item: number): Observable<Item> => {
  // fake api call that simulates taking 50ms to complete
  return of({
    id: item,
    name: `item ${item}`,
  }).pipe(delay(50))
}

const items$ = items$.pipe(
  globalRateLimit(),
  concatMap((item) => itemRequest(item)),
)

// will get all pages, then call itemRequest for each item in page.items
// and all of this will be rate limited to 200 requests per minute
items$.subscribe((item) => console.log(item))

Enter fullscreen mode Exit fullscreen mode

Conclusion

There are a lot of complex situations that can be solved with rxjs. I hope this might help someone in the same situation.

Happy coding!

Top comments (0)