So I recently had to add a new feature to an existing app. The new feature did some data heavy stuff like processing large documents of which the content was to be saved into a database.
Naturally, I queued the data from the file and consumed the queue in a forked child process then saved the info to the database in the child process. To send a progress report on the status of the processing, I decided to use socketio to fire events to the client. This approach presented me with several problems because for one the processing was fast and the socketio instance didn't capture most of the events another problem was how to use the same socketio Instance between parent and child.
The approach I later settled for was to use Redis Pub/Sub to fire events from the child processes, listen on the main process and send said events to the client. The approach works, scales well and gives a really good performance.
Now for Some code
I will assume you have an existing nodejs app and the data has been queued already. We need to install the following
- Redis Nodejs Client (I use https://www.npmjs.com/package/redis)
- SocketIo
Both can be installed using npm. npm i -S socket.io redis
RabbitMqHelper
Though this is out of scope for this article, I wrote a RabbitMq Helper which I use in my apps.
const rabbitMq = require('amqplib'); | |
class RabbitMqHelper { | |
constructor (channel_name) { | |
this.channel = null; | |
this.connection = null; | |
this.queueAssert = null; | |
this.channel_name = channel_name | |
} | |
async setup () { | |
const conn = await rabbitMq.connect(process.env.RABBITMQ_URL) | |
const ch = await conn.createChannel() | |
this.queueAssert = ch.assertQueue(this.channel_name, {durable:true}) | |
this.channel = ch | |
return this.queueAssert | |
} | |
sendToQueue (message) { | |
this.channel.sendToQueue(this.channel_name, Buffer.from(message), {persistent: true}); | |
} | |
} | |
module.exports = RabbitMqHelper |
The Child Process
The feature required processing different queues that had different types of information but they both required the same underlying action; Saving into the database. So, I wrote a base child process and the specifics of each child process extended this
Base Worker
require('dotenv').config() | |
const redis = require('redis') | |
const QueueHelper = require('../../util/rabbitmq.helper'); | |
const modelContainer = require('../models') | |
const pub = redis.createClient(); // creating for localhost | |
class BaseWorker { | |
constructor (type, model) { | |
this.type = type | |
this.QueueName = `${type}:queue` | |
this.rb = new QueueHelper(this.QueueName); // Named the queue after the type | |
this.model = model | |
} | |
async runQueue () { | |
let count = 0; | |
await this.rb.setup(); | |
const {messageCount} = await this.rb.queueAssert | |
const type = this.type | |
this.rb.channel.consume( | |
this.QueueName, | |
async msg => { | |
try{ | |
setImmediate( async function () { | |
const strContent = msg.content.toString() | |
const queuedData = JSON.parse(strContent); | |
const {item_id} = queuedData | |
await this.model.create(queuedData) | |
this.rb.channel.ack(msg); | |
++count | |
await pub.publish(`channel:${type}`, JSON.stringify({count, item_id, messageCount})) | |
if(count == messageCount) { | |
// At this point the queue has finished and you can perform any clean up etc here | |
} | |
}) | |
}catch(e){ | |
} | |
} | |
) | |
} | |
} |
User Worker
const BaseQueue = require('./base_worker') | |
const userModel = require('../models/userModel') | |
class UserWorker extends BaseQueue { | |
constructor () { | |
super('users', userModel) | |
} | |
runQueue () { | |
super.runQueue() | |
} | |
} | |
const userQ = new UserWorker() | |
userQ.runQueue() | |
// You can run any other queue with similar implementation |
The Main Process
The main or parent process will fork the child processes anytime it starts. Starting a few child processes isnβt very difficult, but imagine having several child processes, it can be stressful getting the path to each and running them one after the other. So for that, I like to use a glob to find all child processes.
For that, I will use a npm module called glob
.
npm i -S glob
The code for the main process looks like this.
// Note i skipped a lot of steps here like starting a server etc | |
require('dotenv').config() | |
const redis = require('redis') | |
const sub = redis.createClient(); | |
const cp = require('child_process'); | |
const glob = require('glob'); | |
glob('!(node_modules)/**/*.worker.js', function (er, files) { // Ignore all workers in node_modules and use only those defined within my app | |
for (let f of files) { | |
cp.fork(`${__dirname}/${f}`); | |
} | |
}); | |
// Subscribe to the redis channel and anytime an event is fired get it and send to the client | |
const type = `channel:user` // Let listen on the user channel for example | |
sub.subscribe(type) | |
sub.on('message', function(channel, message){ | |
// Send a socketio event and listen on the client | |
}) |
And that is it. Please leave your comments and opinions. Enjoy!
Top comments (1)
thanks for the writeup, it helps me understand this better.