DEV Community

bardonolado
bardonolado

Posted on

NodeJS Non-Blocking Processing

NodeJS Non-Blocking Processing

The problem

Let's say we have a queue and we need to constantly pull items from that. Which is the best approach to that?

In many cases, we can create a piece of code that contains a while call, and for each iteration we pull one or more items from the queue.

function pullFromQueue(queue) {
    while (true) {
        const item = queue.pull();
        doSomething(item);
    }
}
Enter fullscreen mode Exit fullscreen mode

But what if we have something else in our application? Or if we want to do another task too? That piece of code will block everything not relate to the doSomething() call. Every single task will be frozen, just waiting that while loop end. It will end, right?

Unless we get rid of that eternal loop, there is no way to accomplish such pulling process without blocking everything.

We can change doSomething() to be an async function, it will certainly unblock the event loop, but we just created a potential stackoverflow error. Every iteration will fire a doSomething() call without waiting it to ends. Thus, it will continue to accumulate until it reaches the process limit calls, leading it to a real stackoverflow error.

Or we can change that to a recursive function, but it will still block the event loop, as it is almost the same as the the while loop.

function pullFromQueue(queue) {
    const item = queue.pull();
    doSomething(item);

    /* call itself */
    pullFromQueue(queue);
}
Enter fullscreen mode Exit fullscreen mode

The problem is: after we called pullFromQueue(), every single event loop's tick will be in charge of managing it. Leading to a function flood where nothing but that function or something related to it will be executed. If we called something before, it will be frozen. And if we called something after, it won't even start.

The Solution

We aren't letting the event loop breath, and to remedy that we can use the setImmediate(our_call) function. It will schedule our call to some more available position in the event loop. So, every frozen operation will have the opportunity to be executed in the between.

function pullFromQueue(queue) {
    const item = queue.pull();
    doSomething(item);

    /* call itself */
    setImmediate(() => pullFromQueue(queue));
}
Enter fullscreen mode Exit fullscreen mode

Even taking some space in the event loop, the others calls will be executed. Let's try to visualize what is happening:

1. pullFromQueue() called;
2. queue.pull() called;
3. doSomething() called;
4. setImmediate scheduled pullFromQueue();

5. other random call();
6. another random call();

7. pullFromQueue() called;
8. ...
Enter fullscreen mode Exit fullscreen mode

Now we see that there are some other operations being executed between the pullFromQueue() calls, different of what was happening.

We can try to optimize it, said that this can still be event loop consuming. So, when a item is pulled from the queue, we can check if it exists. If not exist (meaning the queue is empty, busy, etc) we can re-schedule the function with a little delay.

function pullFromQueue(queue) {
    const item = queue.pull();
    /* if can't get item, re-schedule this function without calling doSomething() */
    if (!item) {
        return setInterval(() => pullFromQueue(queue), 250);
    }

    doSomething(item);

    /* call itself */
    setImmediate(() => pullFromQueue(queue));
}
Enter fullscreen mode Exit fullscreen mode

This can reduce drastically the event loop consumption when it is not needed (queue empty, timeout, busy, etc).

Now that we have a non-blocking processing function, we can use it to whatever pulling processing we want, without worrying about with our other functionalities in the same NodeJS process.

We can apply that to anything that needs recursive fetching, like: HTTP Requests, Redis Job Queue Pulling, MongoDB Pagination Queries, etc.

Oldest comments (4)

Collapse
 
qm3ster profile image
Mihail Malo • Edited

Any await point yields to the eventloop, much like using setImmediate.
Try the following:

const {log} = console
const proc = x => log('processing', x)
const d = async q => {
  // ⚠️ Attention! Achtung! Attenzione! ⚠️
  // We are awaiting a synchronous value (`undefined`, not a `Promise`) just to create a yield point inside the loop!
  while (q.length) await proc(q.pop())
}
log('starting')
const task = d([3,5,7,9,11,13])
log('started')
await task
log('finished')
Enter fullscreen mode Exit fullscreen mode

You will get started printed before most of the processing lines.

Collapse
 
qm3ster profile image
Mihail Malo • Edited

With rescheduling when empty, consider:

const {log} = console
const sleep = ms => new Promise(res => setTimeout(res, ms))
const q = [3,5,undefined,7,9,undefined,11,13]
// async because we pretend it's a network call or whatever
const try_pull = async () => {
    const x = q.pop()
    log('pulling', x)
    await sleep(100)
    return x
}
const pull = async () => {
    for (let i = 0; i<3; i++) {
        const next = await try_pull()
        if (next) return next
        log('nothing, sleeping 1s')
        await sleep(1000)
    }
    throw new Error('timed out')
}
const proc = x => console.log('processing', x)
const d = async q => {
  while (true) proc(await pull())
}
log('starting')
const task = d()
log('started')
try {
    await task
} catch (err) {
    log(err)
}
log('finished')
Enter fullscreen mode Exit fullscreen mode
Collapse
 
davidroffe profile image
davidroffe

Couldn't help but wonder as to why you made the proc function async? I ran this bit of code with and without it, but spotted no real difference. Just something eating away at my curiosity.

Collapse
 
qm3ster profile image
Mihail Malo

My "bad", it's a carryover from my other, simpler, comment.
There, awaiting proc was the single yield point in the loop, so it did make all the difference. Here there's a second await from the pull inside the loop iteration, so processing can be made sync.
However, and yet again my bad, it seems awaiting a sync value still yields to the event loop as well. So in both examples it is unnecessary.
I'll now edit both comments accordingly.