Apart from CRUD work in backend, there is a lot more to learn and implement in your projects to allow scalable, powerful, and reliable system design. As most of us full-stack and backend developers work with APIs for user-oriented operations which are being triggered from frontend by end users in their various tasks, and then the response of those APIs for that task informs user and drives their work forward, navigates them to further pages, fetches them the required info, and provides them with services.
But not all types of workflows can be given to a designed API and then accomplished in short time with that one single API — it can be, but not on user's waiting. The user expects the web app or app response for his or her simple triggered task. But it is the backend which knows how many actions and tasks must be performed and in which order.
The Solution: Work Queues
For this we use Work Queues and give the work to the workers, whereas a success or work-in-progress response is being sent to the user — allowing it to move along rather than waiting for the whole set of backend operations.
My Implementation Journey
For explanation and demonstration, I will be implementing this concept in one of my projects which involves a set of tasks. As I also worked with Redux for state management in my 'Nur Fashions' e-commerce web app, I decided further upgrade of this project with queues & jobs would be better for learning and productive for this project, adding value to it.
Technology Stack
The module/library I used for this was BullMQ and the storage and execution engine used was Redis, as BullMQ is made on top of Redis. So I used Docker to run my Redis image and allow connection from localhost.
Use Case: Order Processing
Now in my Nur Fashions e-com web app, I selected the order processing aspect. As the backend was already saving order entry to the database and giving success response, now as we discussed, we did not need to overwhelm this API and add other required tasks into it such as:
- Update Inventory for selected ordered items
- Send Email to customer (user)
- Generate Invoice
which I now gave to the workers of the queue.
System Architecture & Design
Now as I always try to design systems for scalability and modular for cleaner and helpful abstraction, so I made up:
Queue Config File
Job Addition From Controller Code:
await orderQueue.add("processOrder", {
orderId: orderData.orderId,
orderEmail: orderDetails.customer.email,
items: orderDetails.items
}, {
attempts: 5,
backoff: {
type: "exponential",
delay: 5000
},
removeOnComplete: 100,
removeOnFail: false,
});
Worker Task Handling File along with limiter and retry logic
Running Workers Separately
Now it was observed and learned by me that workers and queues must be started separately. So as this was a crucial part of server/backend, I used a module called 'concurrently' which executes a list of commands at the same time.
So I made up one index.ts main worker file which was importing Order Worker File, allowing to start different workers in future.
Handling Job Failures: The Critical Part
This is where I got this concern of what if any task/job fails. So I got confronted by different approaches in which the more obvious all along was to have a vast and detailed DB schema for order table/collection. So I made up properties/attributes such as:
inventoryProcessed: Boolean,
inventoryProcessedAt: Date,
emailSent: Boolean,
emailSentAt: Date,
invoiceGenerated: Boolean,
invoiceGeneratedAt: Date,
processing: {
inventoryError: String,
inventoryLastAttempt: Date,
emailError: String,
emailLastAttempt: Date,
invoiceError: String,
invoiceLastAttempt: Date
}
Dual Storage Strategy
This allowed me to update info for dead jobs handling and debugging. Along with this, I used Redis to store flags based on completion/failure of jobs:
try {
const inventoryProcessed = await redis.hget(`order:${orderId}:flags`, "inventoryProcessed");
if (inventoryProcessed !== "1") {
console.log("Updating Inventory: ", orderId);
await InventoryController.updateInventoryPostOrder(items);
await Promise.all([
await redis.hset(`order:${orderId}:flags`, "inventoryProcessed", "1"),
await order.updateOne({orderId}, {
inventoryProcessed: true,
inventoryProcessedAt: new Date()
})
]);
} else {
console.log("Inventory Already Processed, ", orderId);
}
} catch (error: any) {
errors.inventory = error.message;
console.log(`Error While Updating Inventory, ${orderId}, `, error.message);
await order.updateOne({orderId}, {
"processing.inventoryError": error.message,
"processing.inventoryLastAttempt": new Date()
});
}
As in the same way for other tasks in this order process.
Then I also configured the limiter as down below and did exception/error handling along notifying the admin for failed jobs:
{
connection: { host: "127.0.0.1", port: 6379 },
limiter: {
max: 10,
duration: 10000,
},
concurrency: 5
}
///////////
worker.on("completed", (job) => {
console.log(`Job ${job.id} completed Successfully`);
});
worker.on("failed", async (job, error) => {
console.error(`Job ${job?.id} failed, `, error.message);
if (job && job.attemptsMade >= 3) {
await AlertController.sendAlertToAdmin({
orderId: job.data.orderId,
error: error.message,
attempts: job.attemptsMade
});
}
});
worker.on("error", (error) => {
console.error("Worker Error, ", error);
});
Conclusion
Now this is how I learned and implemented worker queues using BullMQ to add scalable, value, and reliable features to my backend, upgrading my backend skill stack and keeping users moving along on the frontend.
Real engineers:
- Design for scale from the start
- Handle failures gracefully
- Keep users informed without making them wait
Top comments (0)