Learn what a concurrent task queue is, the best use cases, and how to write one.
The queue is one of the most used data structures.
You probably use it every day when you shop for groceries (even online) or when you send a text message to your friends.
The concurrent task queue is a very powerful pattern that can really help you handle tasks over time or improve your performance.
Let’s start with the basics
What is a Queue? 🚶🚶🚶
A queue is a linear structure in which values are added at one end and removed from the other. This discipline gives rise to a first-in/first-out behavior (FIFO) that is the defining feature of queues. The two fundamental queue operations are enqueued (add to back) and dequeue (remove from the front) (source).
Representation of a FIFO (first-in, first-out) queue (Wikipedia)
Ok, when should we use it?
Use a queue when you need to maintain the order of events and process the value by that order.
Great, you convinced me! But why do I need the concurrency thing?
As I mentioned above, a queue is able to process one value at a time. But sometimes it’s not fast enough.
Consider the following case 🙌:
You are at your favorite grocery store and have just arrived at the cashier, but unfortunately, there are many people waiting. To speed up the process, the store opened several more registers and each additional cashier has its own queue. So you just have to choose one. If one of the cashiers is having a technical problem or they’re just slow, that queue will be delayed even if the other slots are free.
Concurrent task queue to the rescue! 💪
We will use only one queue for our purposes. In that way, every time a slot becomes free, we will dequeue a person from the queue and send him/her to the free slot.
single concurrent queue (@andreagiuliaderba)
Hooray! 🎉
Let’s examine a use case
Last week, I was working on a Google Chrome extension that sniffs and downloads HLS streams (HTTP Live stream).
HLS streams are combined from multiple chunks that are fetched one by one and streamed to your browser as a single video. You can have thousands of files per stream, and you need to download them all.
We will use our beloved queue to speed up the process and make sure that one slow fetch is not gonna hold up the others.
TL;DR: here’s the code
import { buffers, channel } from "redux-saga"; | |
import { all, call, fork, put, take } from "redux-saga/effects"; | |
/** | |
* creates a queue | |
* | |
* @param {GeneratorFunction} [handler] request handler | |
* @param {number} [workersCount=1] number of workers | |
*/ | |
function* createConcurrentTaskQueue(handler, workersCount = 1) { | |
// a channel to queue incoming action | |
const queueChannel = yield call(channel, buffers.expanding()); | |
function* watcher() { | |
// a channel to queue incoming tasks | |
const workersChannel = yield call(channel, buffers.expanding()); | |
// create n worker 'threads' | |
yield all(Array(workersCount).fill(fork(worker, workersChannel))); | |
// wait for a tasks | |
while (true) { | |
// incoming task | |
const { payload } = yield take(queueChannel); | |
// assign the task to one of the workers | |
yield put(workersChannel, payload); | |
} | |
} | |
// a single worker | |
function* worker(chan) { | |
while (true) { | |
// incoming task | |
const payload = yield take(chan); | |
// handle it with the given handler arg | |
yield handler(payload); | |
} | |
} | |
return { | |
watcher, | |
queueChannel, | |
}; | |
} | |
export default createConcurrentTaskQueue; |
Now let’s look at it piece-by-piece.
1. The handler
function* handler({ uri, index, segmentsCount }) { | |
// get the data | |
const res = yield call(fetch, uri); | |
const blob = yield res.blob(); | |
// report to the store | |
yield put({ type: "CHUNK_DONE", payload: { index, blob } }) | |
// check if all the chunk are ready | |
const currentChunkCount = yield select(currentChunkCountSelector) | |
if (currentChunkCout === segmentsCount) { | |
yield allDoneChannel.put({ type: "DONE" }); | |
} | |
}; |
This simple handler gets the URI from the payload and then:
- fetches the chunk
- transforms it to a blob
- emits a chunk-ready redux event
- gets the current count of ready chunks
- checks if it’s “all done”
2. Create the queue
const QUEUE_CONCURRENT = 5 | |
const { watcher, queueChannel } = yield createConcurrentTaskQueue( | |
handler, | |
QUEUE_CONCURRENT | |
); | |
const watcherTask = yield fork(watcher) |
Using the handler, we create a new queue with 5 workers. We get back the watcher task and a queue channel. Then we are going to run (fork) the watcher task so it will start listening to tasks.
3. Push the tasks
const segmentsCount = segments.length | |
// transform the segments list to an action list | |
const actions = segments.map((segment, index) => | |
put(queueChannel, { payload: { uri: segment.uri, index, segmentsCount } }) | |
) | |
// fire them all together | |
yield all(actions); |
We map all the segments to a put task (into the queue channel that we got back), and then we fire all the tasks together.
4. Wait for all the chunks to be ready or for the action to be cancelled
// the first to fire between "all done" and "cancel" | |
const { cancelDownload, allDone } = yield race({ | |
allDone: take(allDoneChannel), | |
cancelDownload: take("CANCEL_DOWNLOAD") | |
}); | |
// stop the queue manager | |
yield cancel(watcherTask); | |
// in case of cancellation just return; | |
if (cancelDownload){ | |
return; | |
} | |
// in case of "all done" create a link and put a new action | |
if (allDone) { | |
const link = URL.createObjectURL(blobBuilder.build()); | |
yield put(downloadFinished({ id, link })); | |
return; | |
} |
Now we are waiting for the first action to be called all-done or to be canceled. After that, we can cancel the watcher and act according to the action that has been received.
And that’s it!
If you want to see it live, visit https://github.com/puemos/hls-downloader, and download the Chrome extension.
I hope you learned something new! If you have any questions, please comment below so everybody can benefit.
Top comments (0)