Introduction
When I am handling processes that can be a heavy load on my server, I like to create a sort of queuing system to handle each process one by one, so that if there is a chance of the server being overwhelmed, this can help prevent it. This kind of system can be used on both the client side and the server side.
On the client side, I used this type of queue to upload chunks of a large file in groups of five to a server. And on the server side, I used it to pass videos into ffmpeg one by one to be encoded, which allows it to work in the background, so that the user can do other things in the application and when it has completed, they will receive a notification when available.
Today's blog, I am going to do a quick guide on how I set up a basic queue system for an application, one that focuses on dealing with them one by one and another that batches them in groups.
Queue System
To get started on the queue system, you can use functions or a class. But for today, I am going to use a class to demonstrate, but to make a function version, you are just splitting up the work into smaller bite-sized code. So first, we are going to create a class called QueueSystem with two values an array called itemQueue and a boolean called processActive.
class QueueSystem{
itemsQueue = [];
processActive = false;
}
If you like, you can create a constructor to pass in information you might need, but as we are not, I have left it out.
Next, we will add an asynchronous method that will handle adding items to the queue. This method is named addItem, and we will first push the value to the end of our array named itemsQueue, then we will check if processActive is false. If processActive is false we will run our main process. But before we run that process, we must first change the processActive to true before the process has started. This is because when another item is added and the processActive is true, it lets the server/browser know the system is running and that it just needs to add the new item and not run the main process.
class QueueSystem{
itemsQueue = [];
processActive = false;
async addItem(item)
{
this.itemsQueue.push(item);
if(!this.processActive)
{
this.processActive = true;
await this.mainProcess();
this.processActive = false;
}
return
}
}
For the main process, this is where we loop over each item and also where you can add your code if you want to do tasks in groups or one by one. We will first cover the one-by-one approach.
First, we will loop over our queue with a while loop and shift the first value from our array. Then, to mimic a task, we will call a setTimeout, but because setTimeout doesn't return a promise we can't use await, so instead we wrap it in a promise. Once it loops over our queue and completed the task on all items within the queue, it goes back to the addTask method and sets processActive to false. As long as you add items, it will continue to loop over the items in the queue until it is empty.
class QueueSystem{
itemsQueue = [];
processActive = false;
async addItem(item)
{
this.itemsQueue.push(item);
if(!this.processActive)
{
this.processActive = true;
await this.mainProcess();
this.processActive = false;
}
return
}
async mainProcess()
{
while(this.itemsQueue.length > 0)
{
let item = this.itemsQueue.shift();
// Replace with your task
await new Promise((resolve) =>
setTimeout(() => {resolve(console.log("item: ", item))}, 5000));
}
}
}
Examples
let queue = new QueueSystem()
queue.addItem("Bob");
queue.addItem("Phil");
queue.addItem("Jenny");
setTimeout(() => {
queue.addItem("Katie");
setTimeout(() => {
queue.addItem("Anne");
},5000);
},1000);
Grouping tasks is similar to the one-by-one way, just needs a bit more code to achieve the same results.
First, we have to decide on the number of tasks we want to group. I have decided on three for this example. We will create an array that will hold our items in groups called itemsGroup. Then we will loop over the queue one by one, but this time we will check the itemsGroups length and see if it has three items. If it doesn't have three, we remove an item from the queue and add it to the group. Once we have a group of three items, we will pass the array into a method called handleGroup. In this method is where we run the three tasks together in parallel using Promise.all by passing in each promise one by one and waiting for all the promises to complete.
Once complete, we will console.log the results, but you can do what you want with the results. After that, we return an empty array to grab our next group. If we empty the queue before the group has three items, we then pass the remaining items to the task if there are any.
class QueueSystem{
itemsQueue = [];
processActive = false;
async addItem(item)
{
this.itemsQueue.push(item);
if(!this.processActive)
{
this.processActive = true;
await this.mainProcess();
this.processActive = false;
}
return
}
async mainProcess()
{
let itemsGroup = [];
while(this.itemsQueue.length > 0)
{
if(itemsGroup.length < 3)
{
let item = this.itemsQueue.shift();
itemsGroup.push(item);
}
else
{
itemsGroup = await this.handleGroup(itemsGroup);
}
}
if(itemsGroup.length > 0)
{
itemsGroup = await this.handleGroup(itemsGroup);
}
}
async handleGroup(group)
{
let taskPromises = [];
for (const item of group)
{
const promise = new Promise((resolve) =>
setTimeout(() => {resolve("item: "+ item)}, 3000));
taskPromises.push(promise);
}
const results = await Promise.all(taskPromises);
console.log(results);
return []
}
}
Example
let queue = new QueueSystem()
queue.addItem("Bob");
queue.addItem("Phil");
queue.addItem("Jenny");
setTimeout(() => {
queue.addItem("Katie");
setTimeout(() => {
queue.addItem("Anne");
queue.addItem("Sam");
queue.addItem("Tom");
},2000);
},1000);
For this example, I hard-coded the first three as each time I run it the first one runs on its own before the next two are added to the queue.
Conclusion
You now have a basic queue system that you can change and expand to your preference. If you don't want an item in the queue before the main process, you can filter it out or if you want to retry a failed item, you can remove the successful items and pass the failed ones back through the main process. This was just a quick stepping stone into writing your own queue system to make your application better.


Top comments (0)