DEV Community

Cover image for My RAG Pipeline Took an Hour. Here's How I Got It Down to 30 Seconds.
Prithvi Rajan
Prithvi Rajan

Posted on • Originally published at prithvi-rajan.vercel.app

My RAG Pipeline Took an Hour. Here's How I Got It Down to 30 Seconds.

A content ingestion job used to take over an hour. Now it finishes in 30 seconds. No change in hardware, just better utilization of what is already there, a smarter queue system, and hours debugging how CUDA and multiprocessing works. Here’s how I got there.

I was creating a RAG application with Django, and Milvus as my vector database. I initially created a very simple way to ingest documents.

Old Pipeline

Create a celery task → Fetch the page → Chunk the page → Create vector embeddings → Upload them to Milvus.

This worked great. Nothing wrong with it other than the fact that it was slow. Ingesting the entire Django docs took over an hour.

Can we do better?

So I run everything on my person computer. I have a CUDA GPU (Nvidia 4070 Super), so I wanted to see if that can speed up the process. I changed the embedding model to use the GPU, tweaked some of the docker images and got the GPU to start creating embeddings on my test code.

def get_embedding_model(force_cpu: bool = False):
    global embedding_model
    if embedding_model is None:
        import torch
        from FlagEmbedding import BGEM3FlagModel

        if torch.cuda.is_available() and not force_cpu:
            device = "cuda:0"
        else:
            device = "cpu"

        logger.info(f"Loading BGE-M3 on {device}")

        # Correct, fast loading
        embedding_model = BGEM3FlagModel(
            "/models/bge-m3",      # local path from Docker image
            device=device,
        )

        logger.info(f"Model was loaded with {next(embedding_model.model.parameters()).device}")

    return embedding_model

Enter fullscreen mode Exit fullscreen mode

Looks simple right?

This change ended breaking my celery setup, and forced me to actually optimize my code.

CUDA does not support fork() , so having multiple celery workers was not possible. Each fork tries to re-initialize CUDA context, which throws this error.

RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method
Enter fullscreen mode Exit fullscreen mode

I changed how celery creates new processes to use the “spawn” function, but that ended up crashing my system. My GPU ran out of VRAM quickly, and shut everything down.

My PC crashing kinda scared me and made me take a step back. What are the different bottlenecks I am facing? What is the solution to the bottlenecks?

There were two distinct bottlenecks. The first was fetching pages. With the current pipeline, a worker, after fetching a page has to wait for chunking, embedding, and upload to milvus. The second bottleneck was loading the embedding model onto the GPU takes up a lot of VRAM.

Luckily, the solution to both bottlenecks were exactly the same thing. Decoupling the IO work from the GPU work.

Optimized architecture

The solution was two Celery queues with very different personalities.

New Pipeline

The first is a general-purpose CPU queue. Multiple workers run freely in parallel, each one fetching a page from the API, chunking the content, and passing it downstream. Concurrency here is a feature; the more workers, the faster the pages come in.

The second is a GPU queue, locked to a single worker by design. That one worker does nothing but take chunks from the CPU queue, run them through the embedding model, and push the results to a Redis queue. One process, one model loaded in memory, running continuously without interruption.

The final piece is a Celery Beat job that drains the Redis queue every minute, batching up the accumulated embeddings and writing them to Milvus in bulk rather than one document at a time.

What makes this architecture satisfying is how cleanly each component maps to a resource. CPU workers are cheap to spin up, so you run many. GPU memory is precious, so you protect it with a single process. Milvus writes are expensive per-call, so you batch them. Every design decision follows directly from the constraint it's solving.

And because each layer is independent, scaling is straightforward. If fetching becomes the bottleneck, add CPU workers. If embedding becomes the bottleneck, add a GPU container. Neither change requires touching the other.

Final results

An embedding job, which took over an hour before, was done in 30 seconds. It could have been even faster, but I had to rate limit the API calls to GitHub to follow their policy. This is an 120x improvement in performance over the naive strategy.

Top comments (1)

Collapse
 
prithvi_rajan_222 profile image
Prithvi Rajan

Let me know what you want to learn about RAG in Django