DEV Community

Betty Waiyego
Betty Waiyego

Posted on

How I Designed an Enterprise RAG System Using AWS Bedrock, Pinecone & Neo4j

AI Agent Building

Project Overview

In my role as an AI Solutions Engineer at Prescott Data, I built an enterprise-grade document Q&A chatbot for a client whose internal policy documents were difficult to search, inconsistent in structure, and costly for teams to navigate manually. Employees struggled to find accurate answers quickly, especially across long, multi-column PDFs with cross-referenced policies.

The solution was a retrieval-augmented generation (RAG) system that combines vector similarity search (Pinecone) with knowledge graph reasoning (Neo4j) to deliver accurate, context-aware responses.

Tech Stack:

  • Backend: AWS Lambda (Python), AWS Bedrock (Titan Embeddings & DeepSeek-R1)
  • Databases: Pinecone (vector store), Neo4j (knowledge graph)
  • Document Processing: AWS Textract with LAYOUT feature for multi-column PDFs
  • Frontend: React with Markdown rendering
  • Session Management: DynamoDB for conversation history

System Architecture

The pipeline follows this flow:

  1. Document Ingestion → AWS Textract extracts text from complex multi-column PDFs
  2. Intelligent Chunking → Token-aware semantic chunking (~500 tokens with 50-token overlap)
  3. Embedding Generation → AWS Bedrock Titan creates 1536-dimensional vectors
  4. Dual Storage
    • Pinecone for vector similarity search
    • Neo4j for entity relationships and knowledge graph
  5. Hybrid Retrieval → Queries search both databases simultaneously
  6. Response Generation → DeepSeek-R1 model generates contextual answers with conversation memory

Challenge #1: Multi-Column PDF Processing

The Problem

TradeMark Africa's policy documents use complex multi-column layouts. Traditional PDF parsers (PyPDF2, pdfplumber) read left-to-right across columns, destroying the reading order:

Column 1:              Column 2:
"Section A talks      "Section B covers
about policies"       different topics"

❌ Wrong extraction: "Section A talks Section B covers about policies different topics"
✅ Correct: "Section A talks about policies. Section B covers different topics."
Enter fullscreen mode Exit fullscreen mode

The Solution

AWS Textract's LAYOUT feature analyzes document structure:

def start_textract_job(bucket, key):
    """Start Textract job with LAYOUT feature for multi-column detection"""
    response = textract_client.start_document_analysis(
        DocumentLocation={"S3Object": {"Bucket": bucket, "Name": key}},
        FeatureTypes=["LAYOUT"]  # Critical for preserving reading order
    )
    return response["JobId"]

def sort_blocks_by_reading_order(blocks):
    """Sort text blocks respecting column layout"""
    lines = []
    for block in blocks:
        if block["BlockType"] == "LINE":
            bbox = block["Geometry"]["BoundingBox"]
            lines.append({
                "text": block["Text"],
                "top": bbox["Top"],
                "left": bbox["Left"]
            })

    # Sort by vertical position, then horizontal within columns
    lines.sort(key=lambda x: (x["top"], x["left"]))
    return " ".join([line["text"] for line in lines])
Enter fullscreen mode Exit fullscreen mode

Result: 95%+ accuracy in maintaining document context and flow.


Challenge #2: Token-Aware Chunking

The Problem

AWS Bedrock has a 2048-token context window limit. Naive chunking by character count leads to:

  • Token overflow errors (rejected API calls)
  • Lost context when chunks split mid-sentence
  • Inefficient use of context window

The Solution

Token-aware chunking with semantic boundaries:

import tiktoken

class TokenAwareTextSplitter:
    def __init__(self, max_tokens=500, overlap=50):
        self.max_tokens = max_tokens
        self.overlap = overlap
        self.tokenizer = tiktoken.get_encoding("cl100k_base")

    def split_text(self, text: str):
        # First split by semantic boundaries
        base_splitter = RecursiveCharacterTextSplitter(
            chunk_size=2000,
            chunk_overlap=0,
            separators=["\n\n", "\n", ".", "!", "?", ",", " "]
        )
        paragraphs = base_splitter.split_text(text)

        chunks = []
        current_chunk = []
        current_tokens = 0

        for paragraph in paragraphs:
            paragraph_tokens = len(self.tokenizer.encode(paragraph))

            if current_tokens + paragraph_tokens > self.max_tokens:
                if current_chunk:
                    chunks.append(" ".join(current_chunk))
                current_chunk = [paragraph]
                current_tokens = paragraph_tokens
            else:
                current_chunk.append(paragraph)
                current_tokens += paragraph_tokens

        if current_chunk:
            chunks.append(" ".join(current_chunk))

        return chunks
Enter fullscreen mode Exit fullscreen mode

Key Features:

  • Uses tiktoken (same tokenizer as GPT models) for accurate counting
  • Respects semantic boundaries (paragraphs, sentences)
  • 50-token overlap preserves context between chunks
  • Guarantees no token overflow

Results:

  • Zero API rejections due to token limits
  • Better retrieval accuracy (context preserved)
  • ~30% improvement in answer quality

Challenge #3: Dual Retrieval Strategy

The Problem

Pure vector search misses:

  • Exact terminology matches (acronyms, policy numbers)
  • Relationships between entities
  • Hierarchical document structure

Pure keyword search misses:

  • Semantic similarity ("employee benefits" vs "staff perks")
  • Paraphrased questions

The Solution

Hybrid retrieval combining both approaches:

def dual_retrieval(query, top_k=5):
    # 1. Generate embedding for vector search
    query_embedding = get_embedding(query)

    # 2. Vector search in Pinecone (semantic similarity)
    vector_results = pinecone_index.query(
        vector=query_embedding,
        top_k=top_k,
        include_metadata=True
    )

    # 3. Graph search in Neo4j (relationships & exact matches)
    cypher_query = """
    MATCH (doc:Document)-[:CONTAINS]->(chunk:Chunk)
    WHERE chunk.text CONTAINS $keyword 
       OR chunk.metadata.section = $section
    RETURN chunk.text, chunk.metadata, doc.title
    ORDER BY chunk.relevance_score DESC
    LIMIT $top_k
    """
    graph_results = neo4j_session.run(
        cypher_query, 
        keyword=extract_keywords(query),
        section=identify_section(query),
        top_k=top_k
    )

    # 4. Merge results with weighted scoring
    merged = merge_and_rank(vector_results, graph_results)
    return merged[:top_k]

def merge_and_rank(vector_results, graph_results):
    """Combine results with weighted scoring"""
    scored_chunks = {}

    # Vector results (weight: 0.6)
    for match in vector_results["matches"]:
        chunk_id = match["id"]
        scored_chunks[chunk_id] = {
            "text": match["metadata"]["text"],
            "score": match["score"] * 0.6
        }

    # Graph results (weight: 0.4)
    for record in graph_results:
        chunk_id = record["chunk"].id
        if chunk_id in scored_chunks:
            scored_chunks[chunk_id]["score"] += 0.4
        else:
            scored_chunks[chunk_id] = {
                "text": record["chunk"]["text"],
                "score": 0.4
            }

    # Sort by combined score
    return sorted(scored_chunks.values(), 
                  key=lambda x: x["score"], 
                  reverse=True)
Enter fullscreen mode Exit fullscreen mode

Results:

  • 40% improvement in retrieval accuracy
  • Better handling of acronyms and specific terminology
  • More relevant results for complex queries

Challenge #4: Conversation Memory

The Problem

Users expect conversational context:

User: "What's the leave policy?"
Bot: "Employees get 20 days annual leave..."
User: "What about sick leave?" 
Bot (without memory): ❌ "What are you referring to?"
Bot (with memory): ✅ "For sick leave, the policy states..."
Enter fullscreen mode Exit fullscreen mode

The Solution

Session-based conversation history with DynamoDB:

Frontend (React):

function getSessionId() {
  let sessionId = sessionStorage.getItem('chat_session_id');
  if (!sessionId) {
    sessionId = crypto.randomUUID();
    sessionStorage.setItem('chat_session_id', sessionId);
  }
  return sessionId;
}

const sessionId = getSessionId();

const handleSend = async () => {
  const response = await fetch(API_ENDPOINT, {
    method: 'POST',
    body: JSON.stringify({
      query: input,
      history: chatHistory,  // Previous conversation
      session_id: sessionId
    })
  });
};
Enter fullscreen mode Exit fullscreen mode

Backend (Lambda):

def get_conversation_history(session_id, limit=10):
    """Retrieve conversation from DynamoDB"""
    response = chat_table.query(
        KeyConditionExpression=Key('session_id').eq(session_id),
        ScanIndexForward=False,  # Most recent first
        Limit=limit
    )
    return response['Items']

def save_message(session_id, role, content):
    """Store message in DynamoDB"""
    chat_table.put_item(
        Item={
            'session_id': session_id,
            'timestamp': int(time.time()),
            'role': role,
            'content': content
        }
    )

def generate_response(query, context, history):
    """Generate response with conversation context"""
    messages = [
        {
            "role": "system",
            "content": """You are a helpful assistant for TradeMark Africa.
                         Use the provided context and conversation history."""
        }
    ]

    # Add conversation history
    for msg in history[-5:]:  # Last 5 messages
        messages.append({
            "role": msg["role"],
            "content": msg["content"]
        })

    # Add current query with context
    messages.append({
        "role": "user",
        "content": f"Context: {context}\n\nQuestion: {query}"
    })

    response = bedrock_runtime.invoke_model(
        modelId="us.amazon.nova-lite-v1:0",
        body=json.dumps({"messages": messages})
    )

    return response
Enter fullscreen mode Exit fullscreen mode

Results:

  • Natural multi-turn conversations
  • 70% reduction in clarifying questions
  • Better user experience (feels like talking to human)

Deployment Architecture

AWS Lambda Function:

  • Runtime: Python 3.11
  • Memory: 512 MB
  • Timeout: 30 seconds
  • Environment Variables:
  PINECONE_API_KEY
  PINECONE_INDEX_NAME
  DYNAMODB_TABLE_NAME
  AWS_REGION
Enter fullscreen mode Exit fullscreen mode

API Gateway:

  • REST API endpoint
  • CORS enabled for React frontend
  • Request/response format:
  // Request
  {
    "query": "What's the remote work policy?",
    "history": [...],
    "session_id": "uuid"
  }

  // Response
  {
    "answer": "According to the policy...",
    "sources": ["doc_1_chunk_5", "doc_3_chunk_12"],
    "confidence": 0.89
  }
Enter fullscreen mode Exit fullscreen mode

React Frontend:

  • Markdown rendering for formatted responses
  • Session management with sessionStorage
  • Loading states and error handling
  • Deployed on Vercel for testing
  • Integrated to the external company's MS Teams via Entra ID for employees to access.

Performance Metrics

Retrieval Performance:

  • Average response time: 2.3 seconds
  • Pinecone query: ~200ms
  • Neo4j query: ~150ms
  • Bedrock inference: ~1.8s
  • Total: ~2.3s

Accuracy Metrics:

  • Retrieval accuracy: 87% (hybrid) vs 62% (vector only)
  • Answer relevance: 4.2/5 (user feedback)
  • Context utilization: 95% of retrieved chunks used in responses

Cost Efficiency:

  • Bedrock embeddings: $0.0001 per 1K tokens
  • Pinecone: $70/month (1M vectors, 100 queries/sec)
  • Lambda: ~$5/month (1K invocations)
  • Total: ~$75/month for production workload

Key Lessons Learned

1. Multi-column PDFs require specialized handling

Don't waste time with PyPDF2 or pdfplumber for complex layouts. AWS Textract's LAYOUT feature saved weeks of manual parsing logic.

2. Token counting matters

Always use the actual tokenizer (tiktoken) instead of character approximations. Prevented hundreds of failed API calls.

3. Hybrid search > Pure vector search

Combining semantic search with graph relationships improved accuracy by 40%. Users rarely phrase questions exactly like documentation.

4. Conversation memory is non-negotiable

Session management transformed the UX. Users now complete tasks in 2-3 messages vs 5-7 without context.

5. Chunk overlap is critical

50-token overlap between chunks prevented context loss at boundaries. Worth the 10% storage increase.

6. Start with smaller models

Initially tried Claude Sonnet ($15/1M tokens) → Switched to DeepSeek-R1 ($0.50/1M tokens). 30x cost reduction, same quality for this use case.


Future Improvements

Planned Enhancements:

  • [ ] Citation sources: Show which document sections informed the answer
  • [ ] Query rewriting: Automatically rephrase vague questions
  • [ ] Feedback loop: Track 👍/👎 reactions to fine-tune retrieval
  • [ ] Multi-language support: Add French/Swahili for regional offices
  • [ ] Voice interface: Integrate AWS Transcribe for audio queries
  • [ ] Admin dashboard: Analytics on common questions and gap analysis

Tech Stack Summary

Component Technology Why?
Document Processing AWS Textract Best-in-class multi-column PDF handling
Embeddings AWS Bedrock Titan 1536-dim vectors, $0.0001/1K tokens
Vector DB Pinecone Managed, fast (<200ms queries)
Graph DB Neo4j Entity relationships, Cypher queries
LLM DeepSeek-R1 Cost-effective, great reasoning
Backend AWS Lambda Serverless, auto-scaling
Session Store DynamoDB NoSQL, fast key-value lookups
Frontend React Component-based, easy state management
API AWS API Gateway REST API with CORS support

Code Snippets

Full embedding pipeline:

def process_document(pdf_path):
    # 1. Extract text with Textract
    job_id = start_textract_job(bucket, pdf_path)
    wait_for_job(job_id)
    blocks = get_job_results(job_id)
    text = sort_blocks_by_reading_order(blocks)

    # 2. Chunk intelligently
    splitter = TokenAwareTextSplitter(max_tokens=500, overlap=50)
    chunks = splitter.split_text(text)

    # 3. Generate embeddings
    embeddings = []
    for chunk in chunks:
        embedding = get_embedding(chunk)
        embeddings.append(embedding)

    # 4. Store in Pinecone
    vectors = [
        (f"doc_{i}", emb, {"text": chunk, "source": pdf_path})
        for i, (chunk, emb) in enumerate(zip(chunks, embeddings))
    ]
    pinecone_index.upsert(vectors)

    # 5. Build knowledge graph in Neo4j
    create_knowledge_graph(chunks, pdf_path)
Enter fullscreen mode Exit fullscreen mode

🤝 Acknowledgments

Special thanks to the AWS Community for excellent Bedrock documentation.


💬 Questions?

Have questions about RAG architecture, AWS Bedrock, or hybrid retrieval? Drop them in the comments! 👇

Connect with me:


Tags: #aws #machinelearning #python #react #rag #ai #bedrock #pinecone #neo4j

Top comments (0)