In Nodejs we can still achieve concurrency thanks to The Non-Blocking nature of Nodejs, so we will explore simple implementation how to use concurrency and limiting the Concurrency Operations With Queue
First Of All Concurrency Algorithm
- Initially we Spawn many tasks without exceeding the concurrency limit
- Then every time a task is completed, we spawn one or more tasks until we reach the limit again
Simple Implementation Of The Algorithm
const concurrency = 2;
let running = 0;
let completed = 0;
let index = 0;
function next() {
while (running < concurrency && index < tasks.length) {
const task = tasks[index++];
task(() => {
if (++completed === tasks.length) return finish();
running--;
next();
});
running++;
}
}
const finish = () => console.log('All tasks executed!');
next();
Problem With The Previous Algorithm
Every task finished would spawn another two tasks more so that causing exponentially grows so we want to solve this problem to limiting the concurrency the solution is simple as using Queue to limiting the concurrency problem
we're going to use these things
Queues To Limiting The Concurrency
We want to limit the global number of operations we can have running parallel we're going to use queues to make it happen
Concurrency Algorithm + Queues
export class TaskQueue {
constructor (concurrency) {
this.concurrency = concurrency
this.running = 0
this.queue = []
}
pushTask (task) {
this.queue.push(task)
process.nextTick(this.next.bind(this))
return this
}
next () {
while (this.running < this.concurrency && this.queue.length) {
const task = this.queue.shift()
task(() => {
this.running--
process.nextTick(this.next.bind(this))
})
this.running++
}
}
}
Beautify TaskQueue With EventEmitter
We Need Some Information With Every Operations Like
- If operation has failed
- All operations in Queue has been completed and Queue is empty Concurrency Algorithm + Queues + EventEmitter
export class TaskQueue extends EventEmitter {
constructor (concurrency) {
super()
this.concurrency = concurrency
this.running = 0
this.queue = []
}
pushTask (task) {
this.queue.push(task)
process.nextTick(this.next.bind(this))
return this
}
next () {
if (this.running === 0 && this.queue.length === 0) {
return this.emit('empty')
}
while (this.running < this.concurrency && this.queue.length) {
const task = this.queue.shift()
task((err) => {
if (err) {
this.emit('error', err)
}
this.running--
process.nextTick(this.next.bind(this))
})
this.running++
}
}
}
Important Note
In Case Of Error we Are Keeping the Queue Running we aren't Stopping Other Tasks in Progress This is Quite Common With Queue-Based System Errors Are Expected to Happen So it's Better to Identify Errors and to Think about retry or Recovery Strategies
References
Nodejs Design Pattern Book is The Best Reference if you want More Information About this Topic you Can See CH4 At this Book
https://www.packtpub.com/product/node-js-design-patterns-third-edition/9781839214110
Top comments (0)