DEV Community

3 3

Sending Realtime Events To The Client From Node child process

So I recently had to add a new feature to an existing app. The new feature did some data heavy stuff like processing large documents of which the content was to be saved into a database.

Naturally, I queued the data from the file and consumed the queue in a forked child process then saved the info to the database in the child process. To send a progress report on the status of the processing, I decided to use socketio to fire events to the client. This approach presented me with several problems because for one the processing was fast and the socketio instance didn't capture most of the events another problem was how to use the same socketio Instance between parent and child.

The approach I later settled for was to use Redis Pub/Sub to fire events from the child processes, listen on the main process and send said events to the client. The approach works, scales well and gives a really good performance.

Now for Some code

I will assume you have an existing nodejs app and the data has been queued already. We need to install the following

Both can be installed using npm. npm i -S socket.io redis

RabbitMqHelper

Though this is out of scope for this article, I wrote a RabbitMq Helper which I use in my apps.

const rabbitMq = require('amqplib');
class RabbitMqHelper {
constructor (channel_name) {
this.channel = null;
this.connection = null;
this.queueAssert = null;
this.channel_name = channel_name
}
async setup () {
const conn = await rabbitMq.connect(process.env.RABBITMQ_URL)
const ch = await conn.createChannel()
this.queueAssert = ch.assertQueue(this.channel_name, {durable:true})
this.channel = ch
return this.queueAssert
}
sendToQueue (message) {
this.channel.sendToQueue(this.channel_name, Buffer.from(message), {persistent: true});
}
}
module.exports = RabbitMqHelper
view raw rabbitMqHelper.js hosted with ❀ by GitHub

The Child Process

The feature required processing different queues that had different types of information but they both required the same underlying action; Saving into the database. So, I wrote a base child process and the specifics of each child process extended this

Base Worker

require('dotenv').config()
const redis = require('redis')
const QueueHelper = require('../../util/rabbitmq.helper');
const modelContainer = require('../models')
const pub = redis.createClient(); // creating for localhost
class BaseWorker {
constructor (type, model) {
this.type = type
this.QueueName = `${type}:queue`
this.rb = new QueueHelper(this.QueueName); // Named the queue after the type
this.model = model
}
async runQueue () {
let count = 0;
await this.rb.setup();
const {messageCount} = await this.rb.queueAssert
const type = this.type
this.rb.channel.consume(
this.QueueName,
async msg => {
try{
setImmediate( async function () {
const strContent = msg.content.toString()
const queuedData = JSON.parse(strContent);
const {item_id} = queuedData
await this.model.create(queuedData)
this.rb.channel.ack(msg);
++count
await pub.publish(`channel:${type}`, JSON.stringify({count, item_id, messageCount}))
if(count == messageCount) {
// At this point the queue has finished and you can perform any clean up etc here
}
})
}catch(e){
}
}
)
}
}
view raw base.worker.js hosted with ❀ by GitHub

User Worker

const BaseQueue = require('./base_worker')
const userModel = require('../models/userModel')
class UserWorker extends BaseQueue {
constructor () {
super('users', userModel)
}
runQueue () {
super.runQueue()
}
}
const userQ = new UserWorker()
userQ.runQueue()
// You can run any other queue with similar implementation
view raw user.worker.js hosted with ❀ by GitHub

The Main Process

The main or parent process will fork the child processes anytime it starts. Starting a few child processes isn’t very difficult, but imagine having several child processes, it can be stressful getting the path to each and running them one after the other. So for that, I like to use a glob to find all child processes.

For that, I will use a npm module called glob.

npm i -S glob

The code for the main process looks like this.

// Note i skipped a lot of steps here like starting a server etc
require('dotenv').config()
const redis = require('redis')
const sub = redis.createClient();
const cp = require('child_process');
const glob = require('glob');
glob('!(node_modules)/**/*.worker.js', function (er, files) { // Ignore all workers in node_modules and use only those defined within my app
for (let f of files) {
cp.fork(`${__dirname}/${f}`);
}
});
// Subscribe to the redis channel and anytime an event is fired get it and send to the client
const type = `channel:user` // Let listen on the user channel for example
sub.subscribe(type)
sub.on('message', function(channel, message){
// Send a socketio event and listen on the client
})
view raw main.js hosted with ❀ by GitHub

And that is it. Please leave your comments and opinions. Enjoy!

Sentry image

Hands-on debugging session: instrument, monitor, and fix

Join Lazar for a hands-on session where you’ll build it, break it, debug it, and fix it. You’ll set up Sentry, track errors, use Session Replay and Tracing, and leverage some good ol’ AI to find and fix issues fast.

RSVP here β†’

Top comments (1)

Collapse
 
moose profile image
moose β€’

thanks for the writeup, it helps me understand this better.

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs