DEV Community

RagLeap
RagLeap

Posted on

How I Built a Production RAG System with Django and Neo4j — Complete Guide

Most RAG tutorials show you how to build a demo. This is not that.

This is how we built a production RAG system that handles real business queries — connecting to live databases, answering customer questions from actual documents, and running 24/7 across WhatsApp, voice calls, and web chat simultaneously.

Here is everything we learned building RagLeap — a self-hosted AI business platform — from scratch.

Why We Chose Django Over FastAPI

Everyone building AI systems defaults to FastAPI. We chose Django. Here is why.

When you are building a product — not a microservice — Django's batteries-included philosophy saves months:

Authentication and multi-tenancy out of the box
ORM that handles complex queries without raw SQL
Admin panel for internal tooling
Mature migration system
Signals for event-driven architecture

We built a custom TenantMiddleware that resolves the workspace from every request context. Every database query, every RAG retrieval, every AI response is automatically scoped to the correct workspace without the developer thinking about it.

pythonclass TenantMiddleware:
def init(self, get_response):
self.get_response = get_response

def __call__(self, request):
    workspace_id = self.resolve_workspace(request)
    request.workspace = Workspace.objects.get(id=workspace_id)
    response = self.get_response(request)
    return response

def resolve_workspace(self, request):
    # Resolve from subdomain, token, or session
    if hasattr(request, 'auth'):
        return request.auth.workspace_id
    return request.session.get('workspace_id')
Enter fullscreen mode Exit fullscreen mode

Why Neo4j Instead of a Vector Database

This is the question we get asked most.

Standard RAG works like this:

Chunk documents
Embed chunks
Store in vector DB
Retrieve by similarity

It works fine for simple Q&A. It breaks down for complex business queries.

Example: "Which customers complained about delivery last month and what products did they order?"

A vector search returns chunks mentioning "complaints" and "delivery." But it cannot traverse the relationship between a complaint, the customer who made it, and the orders associated with that customer.

That is a graph traversal problem — exactly what Neo4j solves.

In RagLeap, we build a knowledge graph from uploaded documents:

pythonfrom neo4j import GraphDatabase

class KnowledgeGraphBuilder:
def init(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))

def create_entity(self, entity_type, properties, workspace_id):
    with self.driver.session() as session:
        session.run(
            f"""
            MERGE (e:{entity_type} {{id: $id, workspace_id: $workspace_id}})
            SET e += $properties
            RETURN e
            """,
            id=properties.get('id'),
            workspace_id=workspace_id,
            properties=properties
        )

def create_relationship(self, from_id, to_id, relationship_type, workspace_id):
    with self.driver.session() as session:
        session.run(
            """
            MATCH (a {id: $from_id, workspace_id: $workspace_id})
            MATCH (b {id: $to_id, workspace_id: $workspace_id})
            MERGE (a)-[r:RELATES_TO {type: $rel_type}]->(b)
            RETURN r
            """,
            from_id=from_id,
            to_id=to_id,
            workspace_id=workspace_id,
            rel_type=relationship_type
        )
Enter fullscreen mode Exit fullscreen mode

Entities (customers, products, orders, policies) become nodes. Relationships become edges. When a query comes in, we combine vector similarity search with graph traversal.

The result: Answers that make sense for business queries, not just keyword matches.

The Hybrid Retrieval Pipeline

Here is our actual retrieval pipeline that combines vector search with graph traversal:

pythonclass HybridRetriever:
def init(self, vector_store, graph_db, workspace_id):
self.vector_store = vector_store
self.graph_db = graph_db
self.workspace_id = workspace_id

def retrieve(self, query, top_k=5):
    # Step 1: Vector similarity search
    vector_results = self.vector_store.similarity_search(
        query,
        k=top_k,
        filter={"workspace_id": self.workspace_id}
    )

    # Step 2: Extract entities from query
    entities = self.extract_entities(query)

    # Step 3: Graph traversal from extracted entities
    graph_results = []
    for entity in entities:
        related = self.graph_db.traverse(
            entity,
            max_depth=2,
            workspace_id=self.workspace_id
        )
        graph_results.extend(related)

    # Step 4: Merge and deduplicate results
    combined = self.merge_results(vector_results, graph_results)

    return combined

def extract_entities(self, query):
    # Simple NER — in production we use spaCy or LLM-based extraction
    # Returns list of entity IDs found in query
    pass

def merge_results(self, vector_results, graph_results):
    # Score and deduplicate
    seen = set()
    merged = []
    for result in vector_results + graph_results:
        if result.id not in seen:
            seen.add(result.id)
            merged.append(result)
    return merged[:10]
Enter fullscreen mode Exit fullscreen mode

Celery for Async — The Backbone of Everything

Almost everything in RagLeap that is not a synchronous API response goes through Celery:

Document ingestion and knowledge graph building
Email monitoring and reply drafting
Voice call management
Lead follow-up automation
WhatsApp message processing

We separate Celery beat (scheduler) from Celery workers (executors). Both run as separate Supervisor processes.

python# tasks.py
from celery import shared_task
from .knowledge_graph import KnowledgeGraphBuilder
from .document_processor import DocumentProcessor

@shared_task(bind=True, max_retries=3)
def process_document(self, document_id, workspace_id):
try:
processor = DocumentProcessor()
doc = processor.load(document_id)

    # Extract chunks
    chunks = processor.chunk(doc, chunk_size=512, overlap=50)

    # Embed and store in vector DB
    processor.embed_and_store(chunks, workspace_id)

    # Build knowledge graph
    builder = KnowledgeGraphBuilder()
    entities = processor.extract_entities(doc)
    builder.build_graph(entities, workspace_id)

    return {"status": "success", "chunks": len(chunks)}

except Exception as exc:
    raise self.retry(exc=exc, countdown=60)
Enter fullscreen mode Exit fullscreen mode

Supervisor configuration for separate worker processes:

ini# /etc/supervisor/conf.d/celery.conf
[program:celery_worker]
command=/path/to/venv/bin/celery -A myproject worker --loglevel=info --concurrency=2
directory=/path/to/project
environment=PATH="/usr/bin:/usr/local/bin"
autostart=true
autorestart=true

[program:celery_beat]
command=/path/to/venv/bin/celery -A myproject beat --loglevel=info
directory=/path/to/project
autostart=true
autorestart=true

Multi-Channel Architecture

One AI brain serving WhatsApp, Telegram, Voice calls, Email and web chat simultaneously.

The key insight: normalize everything into the same internal format before it hits the AI.

pythonclass MessageNormalizer:
def normalize(self, raw_message, channel):
return {
"content": self.extract_content(raw_message, channel),
"sender_id": self.extract_sender(raw_message, channel),
"channel": channel,
"timestamp": self.extract_timestamp(raw_message, channel),
"metadata": self.extract_metadata(raw_message, channel)
}

def extract_content(self, message, channel):
    extractors = {
        "whatsapp": lambda m: m.get("body", ""),
        "telegram": lambda m: m.get("text", ""),
        "voice": lambda m: m.get("transcript", ""),
        "email": lambda m: m.get("text_body", ""),
        "web": lambda m: m.get("message", "")
    }
    return extractors[channel](message)
Enter fullscreen mode Exit fullscreen mode

Channel-specific formatting happens on the way out:

pythonclass ResponseFormatter:
def format(self, ai_response, channel):
formatters = {
"whatsapp": self.format_whatsapp,
"voice": self.format_voice,
"email": self.format_email,
"web": self.format_html
}
return formatterschannel

def format_voice(self, response):
    # Remove markdown, make it conversational
    clean = response.replace("**", "").replace("#", "")
    return clean

def format_whatsapp(self, response):
    # WhatsApp supports basic markdown
    return response
Enter fullscreen mode Exit fullscreen mode

Running Everything on a $20/Month VPS

The entire stack — Django, PostgreSQL, Neo4j, Redis, Celery, Nginx, Gunicorn — runs on a 4GB RAM VPS.

Key optimizations:

bash# PostgreSQL — /etc/postgresql/14/main/postgresql.conf
shared_buffers = 512MB
effective_cache_size = 1GB
work_mem = 16MB

Neo4j — /etc/neo4j/neo4j.conf

server.memory.heap.initial_size=256m
server.memory.heap.max_size=512m
server.memory.pagecache.size=256m

Gunicorn — 2-3 workers maximum

gunicorn myproject.wsgi:application \
--workers 2 \
--worker-class gthread \
--threads 4 \
--bind 0.0.0.0:8000

Redis memory limit:

bash# /etc/redis/redis.conf
maxmemory 256mb
maxmemory-policy allkeys-lru

Result: Full AI platform running on 4GB RAM with stable performance under production load.

Database AI — Connecting to Live Data

One of the hardest and most powerful features: letting AI query the user's actual database in natural language.

The challenge: you cannot just give an LLM a database connection and ask it to write SQL. That is a security nightmare.

Our approach:

pythonclass DatabaseAI:
def init(self, connection_string, workspace_id):
self.engine = create_engine(connection_string)
self.workspace_id = workspace_id
self.schema = self.introspect_schema()

def introspect_schema(self):
    inspector = inspect(self.engine)
    schema = {}
    for table_name in inspector.get_table_names():
        columns = inspector.get_columns(table_name)
        schema[table_name] = {
            col['name']: str(col['type'])
            for col in columns
        }
    return schema

def natural_language_query(self, question):
    # Generate safe parameterized query from schema + question
    query_template = self.llm.generate_query(
        question=question,
        schema=self.schema,
        instruction="Generate a safe READ-ONLY SELECT query. No INSERT, UPDATE, DELETE or DROP."
    )

    # Validate query is read-only
    if self.is_safe_query(query_template):
        result = self.execute_safe(query_template)
        return self.format_result(result)
    else:
        return "I can only read data, not modify it."

def is_safe_query(self, query):
    forbidden = ['INSERT', 'UPDATE', 'DELETE', 'DROP', 'TRUNCATE', 'ALTER']
    query_upper = query.upper()
    return not any(word in query_upper for word in forbidden)
Enter fullscreen mode Exit fullscreen mode

This lets a business owner message their AI: "How many orders came from Chennai this week?" — and get a real answer from their actual database.

Memory System

RagLeap maintains persistent memory across conversations using a summarization approach:

pythonclass ConversationMemory:
def init(self, workspace_id, customer_id):
self.workspace_id = workspace_id
self.customer_id = customer_id

def get_relevant_context(self, current_query):
    # Get recent messages
    recent = ConversationHistory.objects.filter(
        workspace_id=self.workspace_id,
        customer_id=self.customer_id
    ).order_by('-created_at')[:10]

    # Get relevant historical summaries from graph
    historical = self.graph_db.get_customer_context(
        customer_id=self.customer_id,
        query=current_query,
        workspace_id=self.workspace_id
    )

    return {
        "recent": [msg.to_dict() for msg in recent],
        "historical": historical
    }

def summarize_and_store(self, conversation_chunk):
    # Periodically compress old conversations
    summary = self.llm.summarize(conversation_chunk)
    self.graph_db.store_conversation_summary(
        customer_id=self.customer_id,
        summary=summary,
        workspace_id=self.workspace_id
    )
Enter fullscreen mode Exit fullscreen mode

What We Would Do Differently

  1. Separate voice processing earlier.
    Voice has completely different latency requirements from text. Running it in the same Celery worker pool as document ingestion caused priority issues we had to solve later.

  2. Observability from day one.
    Adding proper logging, metrics, and tracing after the fact is painful. Build it in from the beginning.

  3. Schema design for multi-tenancy from migration 1.
    Adding workspace isolation to an existing schema costs weeks of refactoring.

Try RagLeap

If you are building AI systems for businesses or looking for a self-hosted AI platform that actually connects to your data, RagLeap has a free self-hosted tier — single workspace, web embed chatbot, and AI Manager included.

GitHub open source launch coming soon — follow along at ragleap.com.

Top comments (0)