DEV Community

Sourabh  Gawande
Sourabh Gawande

Posted on

No More Cutting in Line: Crafting Fairness in Queues

Image description

So what is this queue fairness and why should you care about it?

Recently we published an article on how to manage long-running tasks using a worker and queue, if you haven't read I suggest that you read it first, you can read it here.

Queuing systems offer reliability, and are very intuitive, but they are also really easy to mess up.

Before coming back to the main question, let's consider this simple scenario, you have set up the infra for managing long-running tasks using a queue so, whenever a user performs an operation on your platform, a job gets enqueued to the queue, an available worker will pick the job from the queue and process it, you might be wondering what's the issue here?

Image description

Now check out the below image, in an imaginary world your app gets a lot of traction, and you onboard a bunch of users who are using your app concurrently. Now your queue has multiple jobs enqueued, so you will notice that other users' jobs are waiting in the queue to be picked up by the worker because Karen decided to perform a bunch of operations at once (well she's not wrong here) other users might not find this experience FAIR

Image description

Let's look at the definition of a fair queue:

Fair queuing is a family of scheduling algorithms used in some processes and network schedulers. The algorithm is designed to achieve fairness when a limited resource is shared, for example, to prevent flows with large packets or processes that generate small jobs from consuming more throughput or CPU time than other flows or processes.

In layman's terms basically, we don't want the users to wait and also not throw more hardware at this problem so we would like to implement some sort of mechanism that will ensure fairness of the queue and try to avoid resource starvation.

### Enough Talk, show me the code algorithm

There are multiple ways to solve this problem, the easiest is to throw more hardware at the problem but, this is not a sustainable solution and you are still going to face a resource starvation problem – hence we are going to put our fancy computer science hats and talk about Round Robin.

Image description

Excusing my terrible visual skills, let's try to understand how round-robin works, in simpler terms, we can think of taking turns, where your job will get resources for a fixed amount of time, and then the resources get passed on to some other job.

Let's devise a dead simple algorithm and then we will try to write code for the same.

We initialize multiple Redis clients to represent different shards of the job queue, allowing for horizontal scaling and load distribution. When a job is enqueued via the /enqueue endpoint, we calculate the shard index based on the job data and push the job onto the queue of the corresponding Redis shard. The /process endpoint iterates through each Redis shard in a round-robin fashion, popping a job from each shard's queue until finding a job to process. This setup ensures fairness in job processing while avoiding resource starvation.

from flask import Flask, request, jsonify
from redis import Redis
import json

app = Flask(__name__)
redis_clients = [Redis(host='localhost', port=6379, db=i) for i in range(3)]  # Assuming 3 Redis shards

# Function to calculate the shard index for a given job
def get_shard_index(job_data):
    # Example: Distribute jobs based on some hash value
    return hash(job_data) % len(redis_clients)

# Endpoint for adding jobs to the queue
@app.route("/enqueue", methods=["POST"])
def enqueue_job():
    job_data = request.json  # Assuming JSON data containing job details
    shard_index = get_shard_index(json.dumps(job_data))
    redis_clients[shard_index].lpush('job_queue', json.dumps(job_data))
    return jsonify({"message": "Job enqueued successfully"}), 200

# Endpoint for processing jobs using round-robin scheduling
@app.route("/process", methods=["GET"])
def process_jobs():
    for redis_client in redis_clients:
        job_data = redis_client.rpop('job_queue')
        if job_data:
            job_data = json.loads(job_data)
            # Process the job (replace this with actual job processing logic)
            print("Processing job:", job_data)
            return jsonify({"message": "Job processed successfully", "job_data": job_data}), 200

    return jsonify({"message": "No jobs to process"}), 404

if __name__ == "__main__":
    app.run(debug=True)

Enter fullscreen mode Exit fullscreen mode

There are several other ways to approach this problem, we will talk about them in future articles.

Top comments (0)