DEV Community

Cover image for Nodejs Building Concurrent Operations With Queue
eslamelkholy
eslamelkholy

Posted on

Nodejs Building Concurrent Operations With Queue

In Nodejs we can still achieve concurrency thanks to The Non-Blocking nature of Nodejs, so we will explore simple implementation how to use concurrency and limiting the Concurrency Operations With Queue

First Of All Concurrency Algorithm

  • Initially we Spawn many tasks without exceeding the concurrency limit
  • Then every time a task is completed, we spawn one or more tasks until we reach the limit again



Alt Text

Simple Implementation Of The Algorithm



const concurrency = 2;
let running = 0;
let completed = 0;
let index = 0;
function next() {
  while (running < concurrency && index < tasks.length) {
    const task = tasks[index++];
    task(() => {
      if (++completed === tasks.length) return finish();

      running--;
      next();
    });
    running++;
  }
}

const finish = () => console.log('All tasks executed!');
next();


Enter fullscreen mode Exit fullscreen mode

Problem With The Previous Algorithm
Every task finished would spawn another two tasks more so that causing exponentially grows so we want to solve this problem to limiting the concurrency the solution is simple as using Queue to limiting the concurrency problem
we're going to use these things

Queues To Limiting The Concurrency

We want to limit the global number of operations we can have running parallel we're going to use queues to make it happen
Concurrency Algorithm + Queues



export class TaskQueue {
  constructor (concurrency) {
    this.concurrency = concurrency
    this.running = 0
    this.queue = []
  }

  pushTask (task) {
    this.queue.push(task)
    process.nextTick(this.next.bind(this))
    return this
  }

  next () {
    while (this.running < this.concurrency && this.queue.length) {
      const task = this.queue.shift()
      task(() => {
        this.running--
        process.nextTick(this.next.bind(this))
      })
      this.running++
    }
  }
}


Enter fullscreen mode Exit fullscreen mode

Beautify TaskQueue With EventEmitter

We Need Some Information With Every Operations Like

  • If operation has failed
  • All operations in Queue has been completed and Queue is empty Concurrency Algorithm + Queues + EventEmitter


export class TaskQueue extends EventEmitter {
  constructor (concurrency) {
    super()
    this.concurrency = concurrency
    this.running = 0
    this.queue = []
  }

  pushTask (task) {
    this.queue.push(task)
    process.nextTick(this.next.bind(this))
    return this
  }

  next () {
    if (this.running === 0 && this.queue.length === 0) {
      return this.emit('empty')
    }

    while (this.running < this.concurrency && this.queue.length) {
      const task = this.queue.shift()
      task((err) => {
        if (err) {
          this.emit('error', err)
        }
        this.running--
        process.nextTick(this.next.bind(this))
      })
      this.running++
    }
  }
}


Enter fullscreen mode Exit fullscreen mode



Important Note
In Case Of Error we Are Keeping the Queue Running we aren't Stopping Other Tasks in Progress This is Quite Common With Queue-Based System Errors Are Expected to Happen So it's Better to Identify Errors and to Think about retry or Recovery Strategies

References

Nodejs Design Pattern Book is The Best Reference if you want More Information About this Topic you Can See CH4 At this Book
https://www.packtpub.com/product/node-js-design-patterns-third-edition/9781839214110

Top comments (0)