DEV Community

Cover image for Building a Smart RAG System: How LangChain's SQLRecordManager Eliminates Duplicate Processing and Keeps Your Vector Store Clean
Seenivasa Ramadurai
Seenivasa Ramadurai

Posted on

Building a Smart RAG System: How LangChain's SQLRecordManager Eliminates Duplicate Processing and Keeps Your Vector Store Clean

Introduction

Have you ever wondered how to build a document processing system that's smart enough to handle any file type, efficient enough to avoid duplicate work, and sophisticated enough to understand the meaning behind your content? Well, I've been wrestling with exactly this problem, and I want to share what I learned while building a hybrid semantic processor that combines the best of both worlds.

The Problem with Traditional RAG Systems

Let's be honest - most RAG (Retrieval Augmented Generation) systems are pretty basic. They chunk your documents into fixed-size pieces, throw them into a vector database, and hope for the best. But this approach has some serious limitations:

  • Dumb chunking: Splitting text every 1000 characters doesn't respect semantic boundaries
  • No change detection: Re-processing everything when one file changes
  • Poor metadata: Limited search context beyond basic similarity
  • File format chaos: Different loaders for different file types

What if we could do better? What if we could build a system that understands document structure, tracks changes intelligently, and provides rich search context? That's exactly what this hybrid semantic processor does.

The Architecture: Best of Both Worlds

The system I built combines three powerful approaches:

1. Intelligent Document Loading

Instead of treating all documents the same, the system uses specialized loaders for different file types. The UniversalDocumentLoader class automatically detects file extensions and routes them to appropriate handlers:

@staticmethod
def load_document(file_path: str) -> List[Document]:
    """Load a document from file path, automatically detecting the format."""
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")

    file_ext = os.path.splitext(file_path)[1].lower()

    if file_ext == '.pdf':
        return UniversalDocumentLoader._load_pdf(file_path)
    elif file_ext in ['.txt', '.md']:
        return UniversalDocumentLoader._load_text(file_path)
    elif file_ext == '.docx':
        return UniversalDocumentLoader._load_word(file_path)
    elif file_ext == '.xlsx':
        return UniversalDocumentLoader._load_excel(file_path)
    elif file_ext == '.pptx':
        return UniversalDocumentLoader._load_powerpoint(file_path)
    elif file_ext == '.html':
        return UniversalDocumentLoader._load_html(file_path)
    else:
        raise ValueError(f"Unsupported file type: {file_ext}")
Enter fullscreen mode Exit fullscreen mode

2. Smart Chunking Strategy

This is where things get interesting. The system doesn't just use one chunking method - it chooses the best approach for each situation:

def _process_document(self, file_path: str, force_semantic: bool = False) -> List[Document]:
    """Process a single document into chunks."""
    documents = UniversalDocumentLoader.load_document(file_path)
    all_chunks = []

    # Check if we should use semantic chunking
    use_semantic = force_semantic or (self.semantic_chunker and not self._is_file_changed(file_path))

    for i, doc in enumerate(documents):
        # Choose chunking method
        if use_semantic and self.semantic_chunker:
            try:
                chunks = self.semantic_chunker.split_documents([doc])
                method = "semantic"
                print(f"   🧠 Using semantic chunking: {len(chunks)} chunks")
            except Exception as e:
                print(f"   ⚠️  Semantic chunking failed, using text: {e}")
                chunks = self.text_splitter.split_documents([doc])
                method = "text_fallback"
        else:
            chunks = self.text_splitter.split_documents([doc])
            method = "text"
            print(f"   πŸ“„ Using text chunking: {len(chunks)} chunks")
Enter fullscreen mode Exit fullscreen mode

The semantic chunker uses embeddings to understand where natural break points occur in the text, creating chunks that respect the document's semantic structure rather than arbitrary character limits.

3. Rich Metadata Enhancement

Every chunk gets enhanced with a wealth of searchable metadata. The system analyzes each chunk to extract:

# Extract keywords and important points
keywords = self._extract_keywords(chunk.page_content)
important_points = self._extract_important_points(chunk.page_content)

enhanced_metadata = self.metadata_enhancer.create_enhanced_metadata(
    doc_id=chunk.metadata.get("source", file_path),
    text=chunk.page_content,
    file_path=file_path,
    chunk_index=j,
    total_chunks=len(chunks),
    chunking_method=method
)

# Add enhanced search metadata
enhanced_metadata.update({
    "keywords": ", ".join(keywords),
    "important_points": ", ".join(important_points),
    "page_number": i + 1 if len(documents) > 1 else 1,
    "total_pages": len(documents),
    "chunk_keywords": ", ".join(keywords[:10]),
    "search_terms": ", ".join(keywords + important_points[:5]),
    "processing_method": method
})
Enter fullscreen mode Exit fullscreen mode

The Magic of SQLRecordManager: Never Duplicate Work Again

Now, here's where things get really smart. The system uses LangChain's SQLRecordManager, and this component is absolutely crucial for production RAG systems. Let me explain why.

What SQLRecordManager Actually Does

Think of SQLRecordManager as the "memory" of your RAG system. It tracks which documents have been indexed and uses timestamps to determine what's changed. But it's much more sophisticated than a simple file tracker.

Here's what happens under the hood:

Record Tracking: Every time a document chunk gets indexed, the RecordManager creates a record with:

  • The document's unique identifier
  • A timestamp of when it was processed
  • A hash of the content
  • Metadata about the source

Smart Updates: When you run incremental indexing, it automatically cleans up outdated document versions while minimizing the time both old and new versions exist in the system. This means:

  • No duplicate embeddings cluttering your vector store
  • Automatic removal of outdated content
  • Efficient processing that only handles changed files

Cleanup Modes: The system offers different cleanup strategies:

  • Incremental: Continuously cleans up documents during indexing, removing outdated versions associated with source IDs that were processed
  • Full: Complete cleanup after indexing, ensuring only the latest versions remain
  • None: No automatic cleanup (useful for append-only scenarios)

Why This Matters in Practice

Without proper record management, your RAG system becomes a mess:

  • Duplicate content confuses retrieval
  • Outdated information pollutes results
  • Every re-run processes everything from scratch
  • Vector store grows indefinitely

With SQLRecordManager, you get:

  • Efficiency: Only changed files get reprocessed
  • Consistency: No duplicate or stale content
  • Scalability: System performance doesn't degrade over time
  • Reliability: Automatic cleanup prevents database bloat

File Change Detection: The Efficiency Game-Changer

One of my favorite features is the intelligent file change detection system. Instead of blindly reprocessing everything, the system maintains its own SQLite database tracking:

  • File content hashes: Detects actual content changes, not just timestamp updates
  • Modification timestamps: Quick checks for obvious changes
  • Processing history: What method was used, how many chunks were created
  • Performance metrics: Enables optimization over time

When you run the processor, it checks each file against this tracking database:

πŸ“„ document.pdf
   ⏭️  No changes detected
πŸ“„ updated_guide.docx  
   πŸ”„ File content changed
   🧠 Using semantic chunking: 15 chunks
Enter fullscreen mode Exit fullscreen mode

This approach can dramatically reduce processing time on large document collections where only a few files have changed.

The Hybrid Search: Combining Vector and Cross-Encoder Reranking

The search component is where the "hybrid" really shines. Here's the two-stage process:

Stage 1: Vector Similarity Search

First, the system performs traditional semantic search using the SentenceTransformer embeddings. This gives us a broad set of potentially relevant documents based on semantic similarity.

Stage 2: Cross-Encoder Reranking

Here's where it gets sophisticated. The system uses a cross-encoder model (BAAI/bge-reranker-v2-m3) to rerank the results. Unlike bi-encoders that create separate embeddings for queries and documents, cross-encoders process query-document pairs together, leading to much more accurate relevance scoring.

The final score combines both approaches:

# Rerank with cross-encoder
if all_results:
    try:
        pairs = [(query, result["document"].page_content) for result in all_results]
        rerank_scores = self.reranker.predict(pairs)

        for i, result in enumerate(all_results):
            result["rerank_score"] = rerank_scores[i]
            result["final_score"] = (result["score"] + result["rerank_score"]) / 2

        all_results.sort(key=lambda x: x["final_score"], reverse=True)
        print(f"   βœ… Found {len(all_results)} results, reranked with cross-encoder")
    except Exception as e:
        print(f"   ⚠️  Reranking failed: {e}")
        all_results.sort(key=lambda x: x["score"], reverse=True)
Enter fullscreen mode Exit fullscreen mode

This gives you the speed of vector search with the accuracy of cross-encoder ranking.

Rich Metadata: Making Search Actually Work

The metadata enhancement system is designed around a simple principle: the more context you provide, the better your search results. For each chunk, the system extracts:

Technical Metadata:

  • Programming language detection
  • Code presence indicators
  • URL and number detection
  • File type classification

Content Analysis:

  • Keyword extraction using frequency analysis
  • Domain-specific term recognition (AI, business, technical terms)
  • Important sentence identification
  • Sentiment indicators

Processing Context:

  • Chunking method used
  • File structure information
  • Processing timestamps
  • Chunk positioning data

This rich metadata enables much more precise search queries. Instead of just finding semantically similar text, you can search for "Python code examples" or "business strategy documents with financial data."

Performance Optimizations That Actually Matter

The system includes several performance optimizations that make it production-ready:

Batch Processing

Documents are processed in batches of 100, balancing memory usage with processing speed.

Adaptive Chunking

The system chooses chunking methods based on file state - semantic for new/changed files, text splitting for quick updates.

Efficient Storage

Multiple database backends handle different concerns:

  • ChromaDB for vector storage
  • SQLite for file tracking
  • SQLRecordManager for indexing state

Memory Management

The embedding models are loaded once and reused across all processing operations.

Real-World Usage Patterns

In practice, this system excels at several common scenarios:

Content Management: Marketing teams can maintain large libraries of content with automatic deduplication and change tracking.

Technical Documentation: Development teams can index code repositories, documentation, and tutorials with proper semantic chunking that respects code structure.

Research Applications: Academic or business research teams can process mixed document types with rich metadata for precise search.

Knowledge Management: Organizations can build comprehensive knowledge bases that stay up-to-date automatically.

The Code Walkthrough: Key Implementation Details

Let me highlight some particularly elegant parts of the implementation:

Universal Document Loading

The UniversalDocumentLoader class automatically detects file types and routes them to appropriate handlers. Looking at the actual implementation, the Excel loader preserves table structure by joining cells with separators:

# Extract text content from the sheet
content_parts = []
for row in worksheet.iter_rows(values_only=True):
    row_content = []
    for cell_value in row:
        if cell_value is not None:
            row_content.append(str(cell_value))
    if row_content:
        content_parts.append(" | ".join(row_content))

if content_parts:
    # Create document for this sheet
    content = "\n".join(content_parts)
    metadata = {
        'source': self.file_path,
        'file_type': 'excel',
        'document_type': 'xlsx',
        'sheet_name': sheet_name,
        'total_rows': len(content_parts)
    }
Enter fullscreen mode Exit fullscreen mode

The system supports multiple file formats through specialized loaders:

if file_ext == '.pdf':
    return UniversalDocumentLoader._load_pdf(file_path)
elif file_ext in ['.txt', '.md']:
    return UniversalDocumentLoader._load_text(file_path)
elif file_ext == '.docx':
    return UniversalDocumentLoader._load_word(file_path)
elif file_ext in ['.xlsx', '.xls', '.csv']:
    return UniversalDocumentLoader._load_excel(file_path)
elif file_ext == '.pptx':
    return UniversalDocumentLoader._load_powerpoint(file_path)
elif file_ext == '.html':
    return UniversalDocumentLoader._load_html(file_path)
Enter fullscreen mode Exit fullscreen mode

Metadata Enhancement Pipeline

The metadata enhancer analyzes content across multiple dimensions. Here's how it actually works in the code:

def _extract_keywords(self, text: str) -> List[str]:
    """Extract important keywords from text."""
    import re
    from collections import Counter

    text = re.sub(r'[^\w\s]', ' ', text.lower())
    words = text.split()

    stop_words = {
        'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by',
        'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did',
        'will', 'would', 'could', 'should', 'may', 'might', 'must', 'can', 'this', 'that', 'these', 'those'
    }

    word_freq = Counter(words)
    keywords = [word for word, freq in word_freq.most_common(20) 
               if word not in stop_words and len(word) > 3 and freq > 1]

    return keywords[:15]
Enter fullscreen mode Exit fullscreen mode

The system also extracts important points by analyzing sentence structure:

def _extract_important_points(self, text: str) -> List[str]:
    """Extract important points from text."""
    sentences = re.split(r'[.!?]+', text)
    important_sentences = []

    for sentence in sentences:
        sentence = sentence.strip()
        if len(sentence) < 20:
            continue

        importance_score = 0
        importance_indicators = [
            'important', 'key', 'critical', 'essential', 'significant', 'major',
            'primary', 'main', 'core', 'fundamental', 'crucial', 'vital',
            'definition', 'concept', 'principle', 'method', 'technique', 'approach'
        ]

        sentence_lower = sentence.lower()
        for indicator in importance_indicators:
            if indicator in sentence_lower:
                importance_score += 1

        # Technical terms boost importance
        tech_terms = re.findall(r'\b[A-Z][a-zA-Z]*\b', sentence)
        importance_score += len(tech_terms) * 0.5

        # Numbers boost importance  
        numbers = re.findall(r'\d+', sentence)
        importance_score += len(numbers) * 0.3

        if importance_score >= 1:
            clean_sentence = re.sub(r'\s+', ' ', sentence).strip()
            if len(clean_sentence) > 100:
                clean_sentence = clean_sentence[:100] + "..."
            important_sentences.append(clean_sentence)

    return important_sentences[:5]
Enter fullscreen mode Exit fullscreen mode

Integrated Change Tracking

The file tracking system uses content hashes rather than just timestamps. Here's the actual implementation:

def _get_file_hash(self, file_path: str) -> str:
    """Get file content hash."""
    hash_obj = hashlib.sha256()
    with open(file_path, 'rb') as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_obj.update(chunk)
    return hash_obj.hexdigest()

def _is_file_changed(self, file_path: str) -> bool:
    """Check if file has changed."""
    conn = sqlite3.connect(self.file_tracker_db)
    cursor = conn.cursor()

    # Get current file info
    current_info = self._get_file_info(file_path)

    # Check stored info
    cursor.execute(
        "SELECT file_hash, last_modified FROM file_tracker WHERE file_path = ?",
        (file_path,)
    )
    result = cursor.fetchone()
    conn.close()

    if not result:
        print(f"   πŸ†• New file detected")
        return True

    stored_hash, stored_modified = result

    if current_info["hash"] != stored_hash:
        print(f"   πŸ”„ File content changed")
        return True

    if current_info["modified"] > stored_modified:
        print(f"   πŸ”„ File modified")
        return True

    print(f"   ⏭️  No changes detected")
    return False
Enter fullscreen mode Exit fullscreen mode

The system maintains a SQLite database to track file processing:

def _init_file_tracker(self):
    """Initialize file tracking database."""
    conn = sqlite3.connect(self.file_tracker_db)
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS file_tracker (
            file_path TEXT PRIMARY KEY,
            file_hash TEXT NOT NULL,
            last_modified REAL,
            last_processed REAL,
            chunk_count INTEGER,
            processing_method TEXT
        )
    ''')
    conn.commit()
    conn.close()
Enter fullscreen mode Exit fullscreen mode

Lessons Learned and Future Directions

Building this system taught me several important lessons:

Semantic Chunking Isn't Always Better: While semantic chunking produces more coherent chunks, it's computationally expensive. The hybrid approach of using it selectively provides the best balance.

Metadata Quality Matters More Than Quantity: Rich, well-structured metadata dramatically improves search accuracy. The investment in metadata enhancement pays off quickly.

Change Detection Is Critical: In production systems, efficient change detection is often more important than perfect initial processing. Users care more about quick updates than perfect accuracy.

Cross-Encoder Reranking Is Worth It: The improvement in search relevance from reranking is substantial and worth the additional computational cost.

Getting Started with Your Own Implementation

If you want to build something similar, here are the key components you'll need:

  1. Document Loaders: Invest in good loaders for your file types
  2. Chunking Strategy: Implement both semantic and traditional chunking
  3. Record Management: Use SQLRecordManager or build similar change tracking
  4. Rich Metadata: Design metadata schemas that support your search patterns
  5. Hybrid Search: Combine vector search with reranking for better results

The complete system demonstrates that with thoughtful architecture, you can build RAG systems that are both powerful and efficient. The key is combining multiple complementary approaches rather than relying on any single technique.

Conclusion

Building effective RAG systems requires thinking beyond simple vector similarity. The hybrid semantic processor shows how combining intelligent document loading, adaptive chunking, comprehensive metadata enhancement, and smart change detection creates a system that's both powerful and practical.

The SQLRecordManager component, in particular, transforms a basic RAG system into a production-ready solution that can handle real-world complexity without becoming inefficient or unreliable.

Whether you're building a knowledge management system, a documentation assistant, or a research tool, these patterns and techniques can help you create something that actually works reliably at scale. The key is understanding that modern RAG systems need to be intelligent about much more than just semantic similarity - they need to understand document structure, track changes efficiently, and provide rich context for search.

The future of RAG isn't just about better embeddings or larger context windows - it's about building systems that truly understand and adapt to the content they're processing. This hybrid approach points the way toward that future.

Source Code files & Data Flow

1. hybrid_semantic_processor.py

#Author-Sreeni Ramadorai
"""
Hybrid Semantic Processor - Best of both worlds
"""

import os
import hashlib
import sqlite3
from typing import List, Dict, Any
from datetime import datetime
import chromadb
from sentence_transformers import SentenceTransformer, CrossEncoder
from langchain_core.documents import Document
from langchain.indexes import index
from langchain.indexes import SQLRecordManager
from langchain_chroma import Chroma
from langchain_experimental.text_splitter import SemanticChunker
from langchain_text_splitters import RecursiveCharacterTextSplitter
from app.document_loaders import UniversalDocumentLoader
from rag_metadata_enhancer import RAGMetadataEnhancer

class HybridSemanticProcessor:
    def __init__(self):
        """Initialize hybrid semantic processor."""
        print("πŸš€ Hybrid Semantic Processor")

        # Initialize components
        self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
        self.reranker = CrossEncoder('BAAI/bge-reranker-v2-m3')
        self.metadata_enhancer = RAGMetadataEnhancer()

        # Initialize semantic chunker
        class SentenceTransformerEmbeddings:
            def __init__(self, model):
                self.model = model
            def embed_documents(self, texts):
                return self.model.encode(texts).tolist()
            def embed_query(self, text):
                return self.model.encode([text]).tolist()[0]

        try:
            self.semantic_chunker = SemanticChunker(
                embeddings=SentenceTransformerEmbeddings(self.embedding_model),
                breakpoint_threshold_type="percentile"
            )
            print("   🧠 Semantic chunking enabled")
        except Exception as e:
            print(f"   ⚠️  Semantic chunking disabled: {e}")
            self.semantic_chunker = None

        # Fallback text splitter
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )

        # Initialize ChromaDB
        self.client = chromadb.PersistentClient(path="hybrid_chroma_data")
        self.vectorstore = Chroma(
            client=self.client,
            collection_name="hybrid_collection",
            embedding_function=self._get_embedding_function()
        )

        # Initialize Record Manager
        self.record_manager = SQLRecordManager(
            namespace="hybrid_semantic",
            db_url="sqlite:///hybrid_record_manager.sqlite"
        )
        self.record_manager.create_schema()

        # File tracking database
        self.file_tracker_db = "hybrid_file_tracker.sqlite"
        self._init_file_tracker()

        print(f"   πŸ“Š Database: {self.vectorstore._collection.count()} documents")
        print("βœ… Ready!")

    def _init_file_tracker(self):
        """Initialize file tracking database."""
        conn = sqlite3.connect(self.file_tracker_db)
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS file_tracker (
                file_path TEXT PRIMARY KEY,
                file_hash TEXT NOT NULL,
                last_modified REAL,
                last_processed REAL,
                chunk_count INTEGER,
                processing_method TEXT
            )
        ''')
        conn.commit()
        conn.close()

    def _get_file_hash(self, file_path: str) -> str:
        """Get file content hash."""
        hash_obj = hashlib.sha256()
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_obj.update(chunk)
        return hash_obj.hexdigest()

    def _get_file_info(self, file_path: str) -> Dict[str, Any]:
        """Get file information."""
        stat = os.stat(file_path)
        return {
            "hash": self._get_file_hash(file_path),
            "modified": stat.st_mtime,
            "size": stat.st_size
        }

    def _is_file_changed(self, file_path: str) -> bool:
        """Check if file has changed."""
        conn = sqlite3.connect(self.file_tracker_db)
        cursor = conn.cursor()

        # Get current file info
        current_info = self._get_file_info(file_path)

        # Check stored info
        cursor.execute(
            "SELECT file_hash, last_modified FROM file_tracker WHERE file_path = ?",
            (file_path,)
        )
        result = cursor.fetchone()
        conn.close()

        if not result:
            print(f"   πŸ†• New file detected")
            return True

        stored_hash, stored_modified = result

        if current_info["hash"] != stored_hash:
            print(f"   πŸ”„ File content changed")
            return True

        if current_info["modified"] > stored_modified:
            print(f"   πŸ”„ File modified")
            return True

        print(f"   ⏭️  No changes detected")
        return False

    def _update_file_tracker(self, file_path: str, chunk_count: int, method: str):
        """Update file tracker."""
        file_info = self._get_file_info(file_path)
        conn = sqlite3.connect(self.file_tracker_db)
        cursor = conn.cursor()
        cursor.execute('''
            INSERT OR REPLACE INTO file_tracker 
            (file_path, file_hash, last_modified, last_processed, chunk_count, processing_method)
            VALUES (?, ?, ?, ?, ?, ?)
        ''', (
            file_path, 
            file_info["hash"], 
            file_info["modified"], 
            datetime.now().timestamp(),
            chunk_count,
            method
        ))
        conn.commit()
        conn.close()

    def _get_embedding_function(self):
        """Get embedding function for Chroma."""
        class ChromaEmbeddingFunction:
            def __init__(self, model):
                self.model = model
            def __call__(self, texts):
                return self.model.encode(texts).tolist()
            def embed_query(self, text):
                return self.model.encode([text]).tolist()[0]
            def embed_documents(self, texts):
                return self.model.encode(texts).tolist()

        return ChromaEmbeddingFunction(self.embedding_model)

    def _process_document(self, file_path: str, force_semantic: bool = False) -> List[Document]:
        """Process a single document into chunks."""
        documents = UniversalDocumentLoader.load_document(file_path)
        all_chunks = []

        # Check if we should use semantic chunking
        use_semantic = force_semantic or (self.semantic_chunker and not self._is_file_changed(file_path))

        for i, doc in enumerate(documents):
            # Choose chunking method
            if use_semantic and self.semantic_chunker:
                try:
                    chunks = self.semantic_chunker.split_documents([doc])
                    method = "semantic"
                    print(f"   🧠 Using semantic chunking: {len(chunks)} chunks")
                except Exception as e:
                    print(f"   ⚠️  Semantic chunking failed, using text: {e}")
                    chunks = self.text_splitter.split_documents([doc])
                    method = "text_fallback"
            else:
                chunks = self.text_splitter.split_documents([doc])
                method = "text"
                print(f"   πŸ“„ Using text chunking: {len(chunks)} chunks")

            # Enhance metadata for each chunk
            for j, chunk in enumerate(chunks):
                # Extract keywords and important points
                keywords = self._extract_keywords(chunk.page_content)
                important_points = self._extract_important_points(chunk.page_content)

                enhanced_metadata = self.metadata_enhancer.create_enhanced_metadata(
                    doc_id=chunk.metadata.get("source", file_path),
                    text=chunk.page_content,
                    file_path=file_path,
                    chunk_index=j,
                    total_chunks=len(chunks),
                    chunking_method=method
                )

                # Add enhanced search metadata
                enhanced_metadata.update({
                    "keywords": ", ".join(keywords),
                    "important_points": ", ".join(important_points),
                    "page_number": i + 1 if len(documents) > 1 else 1,
                    "total_pages": len(documents),
                    "chunk_keywords": ", ".join(keywords[:10]),
                    "search_terms": ", ".join(keywords + important_points[:5]),
                    "processing_method": method
                })

                # Merge with original metadata
                chunk.metadata.update(enhanced_metadata)
                chunk.metadata["source"] = file_path

                all_chunks.append(chunk)

        return all_chunks, method

    def _extract_keywords(self, text: str) -> List[str]:
        """Extract important keywords from text."""
        import re
        from collections import Counter

        text = re.sub(r'[^\w\s]', ' ', text.lower())
        words = text.split()

        stop_words = {
            'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by',
            'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did',
            'will', 'would', 'could', 'should', 'may', 'might', 'must', 'can', 'this', 'that', 'these', 'those'
        }

        word_freq = Counter(words)
        keywords = [word for word, freq in word_freq.most_common(20) 
                   if word not in stop_words and len(word) > 3 and freq > 1]

        return keywords[:15]

    def _extract_important_points(self, text: str) -> List[str]:
        """Extract important points from text."""
        import re

        sentences = re.split(r'[.!?]+', text)
        important_sentences = []

        for sentence in sentences:
            sentence = sentence.strip()
            if len(sentence) < 20:
                continue

            importance_score = 0
            importance_indicators = [
                'important', 'key', 'critical', 'essential', 'significant', 'major',
                'primary', 'main', 'core', 'fundamental', 'crucial', 'vital',
                'definition', 'concept', 'principle', 'method', 'technique', 'approach'
            ]

            sentence_lower = sentence.lower()
            for indicator in importance_indicators:
                if indicator in sentence_lower:
                    importance_score += 1

            tech_terms = re.findall(r'\b[A-Z][a-zA-Z]*\b', sentence)
            importance_score += len(tech_terms) * 0.5

            numbers = re.findall(r'\d+', sentence)
            importance_score += len(numbers) * 0.3

            if importance_score >= 1:
                clean_sentence = re.sub(r'\s+', ' ', sentence).strip()
                if len(clean_sentence) > 100:
                    clean_sentence = clean_sentence[:100] + "..."
                important_sentences.append(clean_sentence)

        return important_sentences[:5]

    def process_all(self, folder_path: str = "raw_data", force_reprocess: bool = False) -> Dict[str, Any]:
        """Process all files using hybrid approach."""
        print(f"πŸš€ Processing: {folder_path}")

        if not os.path.exists(folder_path):
            print(f"❌ Folder not found")
            return {}

        # Collect all documents
        all_documents = []
        file_stats = {}

        for filename in os.listdir(folder_path):
            if filename.startswith('.') or filename == '.gitkeep':
                continue

            file_path = os.path.join(folder_path, filename)
            if os.path.isfile(file_path):
                print(f"πŸ“„ {filename}")

                # Check if file needs processing
                if not force_reprocess and not self._is_file_changed(file_path):
                    print(f"   ⏭️  Skipping unchanged file")
                    continue

                try:
                    documents, method = self._process_document(file_path, force_semantic=force_reprocess)
                    all_documents.extend(documents)
                    file_stats[filename] = {
                        "chunks": len(documents),
                        "method": method
                    }
                    print(f"   πŸ“Š {len(documents)} chunks prepared ({method})")

                    # Update file tracker
                    self._update_file_tracker(file_path, len(documents), method)

                except Exception as e:
                    print(f"   ❌ Error: {e}")

        if not all_documents:
            print("❌ No documents to process")
            return {}

        print(f"\nπŸ”„ Indexing {len(all_documents)} total chunks...")

        # Use LangChain indexing API
        try:
            result = index(
                all_documents,
                self.record_manager,
                self.vectorstore,
                cleanup="incremental",
                source_id_key="source",
                key_encoder="blake2b",
                batch_size=100
            )

            print(f"\nπŸ“Š Indexing Results:")
            print(f"   βœ… Added: {result['num_added']} chunks")
            print(f"   πŸ”„ Updated: {result['num_updated']} chunks")
            print(f"   ⏭️  Skipped: {result['num_skipped']} chunks")
            print(f"   πŸ—‘οΈ  Deleted: {result['num_deleted']} chunks")
            print(f"   πŸ“Š Database: {self.vectorstore._collection.count()} total documents")

            # Show file processing summary
            print(f"\nπŸ“‹ File Processing Summary:")
            for filename, stats in file_stats.items():
                print(f"   πŸ“„ {filename}: {stats['chunks']} chunks ({stats['method']})")

            return result

        except Exception as e:
            print(f"❌ Indexing error: {e}")
            return {"status": "error", "error": str(e)}

    def hybrid_search(self, query: str, k: int = 10) -> List[Dict[str, Any]]:
        """Perform hybrid search with reranking."""
        print(f"πŸ” Hybrid search: '{query}'")

        # Vector search
        vector_results = self.vectorstore.similarity_search_with_score(query, k=k*2)

        # Combine results
        all_results = []
        for doc, score in vector_results:
            all_results.append({
                "document": doc,
                "score": score,
                "type": "vector"
            })

        # Rerank with cross-encoder
        if all_results:
            try:
                pairs = [(query, result["document"].page_content) for result in all_results]
                rerank_scores = self.reranker.predict(pairs)

                for i, result in enumerate(all_results):
                    result["rerank_score"] = rerank_scores[i]
                    result["final_score"] = (result["score"] + result["rerank_score"]) / 2

                all_results.sort(key=lambda x: x["final_score"], reverse=True)
                print(f"   βœ… Found {len(all_results)} results, reranked with cross-encoder")
            except Exception as e:
                print(f"   ⚠️  Reranking failed: {e}")
                all_results.sort(key=lambda x: x["score"], reverse=True)

        return all_results[:k]

def main():
    """Main function."""
    print("πŸš€ Hybrid Semantic Processor")
    print("="*50)

    processor = HybridSemanticProcessor()

    # Process files
    result = processor.process_all()

    # Test search
    print(f"\nπŸ” Testing enhanced search...")
    test_query = "MCP transport types"
    search_results = processor.hybrid_search(test_query, k=3)

    if search_results:
        print(f"\nπŸ“‹ Search Results for '{test_query}':")
        for i, result in enumerate(search_results):
            doc = result["document"]
            print(f"\n{i+1}. Score: {result['final_score']:.3f} ({result['type']})")
            print(f"   πŸ“„ {doc.metadata.get('file_name', 'Unknown')} - Page {doc.metadata.get('page_number', 'N/A')}")
            print(f"   πŸ”‘ Keywords: {doc.metadata.get('keywords', 'N/A')[:80]}...")
            print(f"   πŸ’‘ Content: {doc.page_content[:200]}...")

    print(f"\nπŸŽ‰ Complete!")

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

2. Document Loader document_loaders.py

"""
Document loaders for different file types.
Supports text files, PDF files, Word documents, Excel files, PowerPoint, HTML, and Markdown.
"""

import os
from typing import List, Dict, Any
from langchain_core.documents import Document

# Core loaders
from langchain_community.document_loaders import TextLoader, PyPDFLoader

# New loaders for additional file types
from langchain_community.document_loaders import (
    UnstructuredWordDocumentLoader,
    UnstructuredPowerPointLoader,
    UnstructuredHTMLLoader,
    UnstructuredMarkdownLoader
)

# Custom Excel loader using openpyxl
import openpyxl


class CustomExcelLoader:
    """Custom Excel loader using openpyxl for better compatibility."""

    def __init__(self, file_path: str):
        self.file_path = file_path

    def load(self) -> List[Document]:
        """Load Excel file and convert to documents."""
        try:
            workbook = openpyxl.load_workbook(self.file_path, data_only=True)
            documents = []

            for sheet_name in workbook.sheetnames:
                worksheet = workbook[sheet_name]

                # Extract text content from the sheet
                content_parts = []
                for row in worksheet.iter_rows(values_only=True):
                    row_content = []
                    for cell_value in row:
                        if cell_value is not None:
                            row_content.append(str(cell_value))
                    if row_content:
                        content_parts.append(" | ".join(row_content))

                if content_parts:
                    # Create document for this sheet
                    content = "\n".join(content_parts)
                    metadata = {
                        'source': self.file_path,
                        'file_type': 'excel',
                        'document_type': 'xlsx',
                        'sheet_name': sheet_name,
                        'total_rows': len(content_parts)
                    }

                    documents.append(Document(
                        page_content=content,
                        metadata=metadata
                    ))

            return documents

        except Exception as e:
            raise RuntimeError(f"Failed to load Excel file {self.file_path}: {str(e)}")


class UniversalDocumentLoader:
    """
    A universal document loader that can handle multiple file formats.
    Automatically detects file type and uses appropriate loader.
    """

    @staticmethod
    def load_document(file_path: str) -> List[Document]:
        """
        Load a document from file path, automatically detecting the format.

        Args:
            file_path: Path to the document file

        Returns:
            List of Document objects (usually 1 for most files, multiple for PDFs with pages)
        """
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")

        file_ext = os.path.splitext(file_path)[1].lower()

        if file_ext == '.pdf':
            return UniversalDocumentLoader._load_pdf(file_path)
        elif file_ext in ['.txt', '.md']:
            return UniversalDocumentLoader._load_text(file_path)
        elif file_ext == '.docx':
            return UniversalDocumentLoader._load_word(file_path)
        elif file_ext == '.xlsx':
            return UniversalDocumentLoader._load_excel(file_path)
        elif file_ext == '.pptx':
            return UniversalDocumentLoader._load_powerpoint(file_path)
        elif file_ext == '.html':
            return UniversalDocumentLoader._load_html(file_path)
        else:
            raise ValueError(f"Unsupported file type: {file_ext}")

    @staticmethod
    def _load_pdf(file_path: str) -> List[Document]:
        """Load PDF file using PyPDFLoader."""
        try:
            loader = PyPDFLoader(file_path)
            documents = loader.load()

            # Add source metadata to each document
            for doc in documents:
                doc.metadata['source'] = file_path
                doc.metadata['file_type'] = 'pdf'
                doc.metadata['page_number'] = doc.metadata.get('page', 0)

            return documents
        except Exception as e:
            raise RuntimeError(f"Failed to load PDF {file_path}: {str(e)}")

    @staticmethod
    def _load_text(file_path: str) -> List[Document]:
        """Load text file using TextLoader."""
        try:
            loader = TextLoader(file_path)
            documents = loader.load()

            # Add source metadata to each document
            for doc in documents:
                doc.metadata['source'] = file_path
                doc.metadata['file_type'] = 'text'

            return documents
        except Exception as e:
            raise RuntimeError(f"Failed to load text file {file_path}: {str(e)}")

    @staticmethod
    def _load_word(file_path: str) -> List[Document]:
        """Load Word document using UnstructuredWordDocumentLoader."""
        try:
            loader = UnstructuredWordDocumentLoader(file_path)
            documents = loader.load()

            # Add source metadata to each document
            for doc in documents:
                doc.metadata['source'] = file_path
                doc.metadata['file_type'] = 'word'
                doc.metadata['document_type'] = 'docx'

            return documents
        except Exception as e:
            raise RuntimeError(f"Failed to load Word document {file_path}: {str(e)}")

    @staticmethod
    def _load_excel(file_path: str) -> List[Document]:
        """Load Excel file using custom openpyxl-based loader."""
        try:
            loader = CustomExcelLoader(file_path)
            documents = loader.load()

            # Add source metadata to each document
            for doc in documents:
                doc.metadata['source'] = file_path
                doc.metadata['file_type'] = 'excel'
                doc.metadata['document_type'] = 'xlsx'

            return documents
        except Exception as e:
            raise RuntimeError(f"Failed to load Excel file {file_path}: {str(e)}")

    @staticmethod
    def _load_powerpoint(file_path: str) -> List[Document]:
        """Load PowerPoint file using UnstructuredPowerPointLoader."""
        try:
            loader = UnstructuredPowerPointLoader(file_path)
            documents = loader.load()

            # Add source metadata to each document
            for doc in documents:
                doc.metadata['source'] = file_path
                doc.metadata['file_type'] = 'powerpoint'
                doc.metadata['document_type'] = 'pptx'
                # PowerPoint-specific metadata
                if 'slide_number' in doc.metadata:
                    doc.metadata['slide_number'] = doc.metadata['slide_number']

            return documents
        except Exception as e:
            raise RuntimeError(f"Failed to load PowerPoint file {file_path}: {str(e)}")

    @staticmethod
    def _load_html(file_path: str) -> List[Document]:
        """Load HTML file using UnstructuredHTMLLoader."""
        try:
            loader = UnstructuredHTMLLoader(file_path)
            documents = loader.load()

            # Add source metadata to each document
            for doc in documents:
                doc.metadata['source'] = file_path
                doc.metadata['file_type'] = 'html'
                doc.metadata['document_type'] = 'html'

            return documents
        except Exception as e:
            raise RuntimeError(f"Failed to load HTML file {file_path}: {str(e)}")

    @staticmethod
    def get_supported_extensions() -> List[str]:
        """Get list of supported file extensions."""
        return ['.txt', '.md', '.pdf', '.docx', '.xlsx', '.pptx', '.html']


def load_documents_from_directory(directory_path: str) -> List[Document]:
    """
    Load all supported documents from a directory.

    Args:
        directory_path: Path to directory containing documents

    Returns:
        List of all Document objects from the directory
    """
    if not os.path.exists(directory_path):
        raise FileNotFoundError(f"Directory not found: {directory_path}")

    all_documents = []
    supported_extensions = UniversalDocumentLoader.get_supported_extensions()

    for filename in os.listdir(directory_path):
        file_path = os.path.join(directory_path, filename)

        if os.path.isfile(file_path):
            file_ext = os.path.splitext(filename)[1].lower()

            if file_ext in supported_extensions:
                try:
                    documents = UniversalDocumentLoader.load_document(file_path)
                    all_documents.extend(documents)
                except Exception as e:
                    print(f"Warning: Failed to load {filename}: {str(e)}")
                    continue

    return all_documents

Enter fullscreen mode Exit fullscreen mode
  1. RAG Metadata Enhancer
#!/usr/bin/env python3
"""
RAG Metadata Enhancer - Key fields to increase search accuracy
"""

import os
import re
import hashlib
from typing import List, Dict, Any
from datetime import datetime
from collections import Counter

class RAGMetadataEnhancer:
    def __init__(self):
        self.technical_terms = {
            'ai', 'ml', 'machine learning', 'deep learning', 'neural network', 'algorithm',
            'data science', 'python', 'javascript', 'api', 'database', 'cloud', 'aws',
            'docker', 'kubernetes', 'microservices', 'rest', 'graphql', 'sql', 'nosql'
        }

        self.business_terms = {
            'strategy', 'business', 'market', 'customer', 'revenue', 'profit', 'growth',
            'investment', 'finance', 'management', 'leadership', 'team', 'project'
        }

    def extract_keywords(self, text: str) -> List[str]:
        """Extract relevant keywords."""
        words = re.findall(r'\b\w+\b', text.lower())
        words = [w for w in words if len(w) > 2]

        # Get frequency
        word_freq = Counter(words)

        # Add technical/business terms
        keywords = []
        text_lower = text.lower()

        for term_set in [self.technical_terms, self.business_terms]:
            for term in term_set:
                if term in text_lower:
                    keywords.append(term)

        # Add high-frequency words
        for word, freq in word_freq.most_common(10):
            if freq > 1 and word not in keywords:
                keywords.append(word)

        return keywords[:15]

    def analyze_content(self, text: str, file_path: str) -> Dict[str, Any]:
        """Analyze content for metadata."""
        # Basic stats
        words = text.split()
        sentences = text.split('.')

        # Content type
        content_type = self._detect_content_type(text, file_path)

        # Keywords
        keywords = self.extract_keywords(text)

        # Features
        has_code = bool(re.search(r'```

|def |class |import ', text))
        has_numbers = bool(re.search(r'\d+', text))
        has_urls = bool(re.search(r'http[s]?://', text))

        # Sentiment (simple)
        positive_words = {'good', 'great', 'excellent', 'amazing', 'best'}
        negative_words = {'bad', 'terrible', 'awful', 'worst', 'problem'}

        text_lower = text.lower()
        positive_count = sum(1 for word in positive_words if word in text_lower)
        negative_count = sum(1 for word in negative_words if word in text_lower)
        sentiment = 'positive' if positive_count > negative_count else 'negative' if negative_count > positive_count else 'neutral'

        return {
            "word_count": len(words),
            "sentence_count": len(sentences),
            "content_type": content_type,
            "keywords": keywords,
            "has_code": has_code,
            "has_numbers": has_numbers,
            "has_urls": has_urls,
            "sentiment": sentiment,
            "text_hash": hashlib.sha256(text.encode()).hexdigest()
        }

    def _detect_content_type(self, text: str, file_path: str) -> str:
        """Detect content type."""
        text_lower = text.lower()
        file_ext = os.path.splitext(file_path)[1].lower()

        if file_ext in ['.py', '.js', '.java']:
            return 'code'
        elif file_ext in ['.md', '.txt']:
            return 'documentation'
        elif re.search(r'

```|def |class |import ', text):
            return 'code'
        elif re.search(r'# |## |### ', text):
            return 'documentation'
        elif re.search(r'tutorial|guide|how to', text_lower):
            return 'tutorial'

        return 'general'

    def create_enhanced_metadata(self, doc_id: str, text: str, file_path: str, 
                                chunk_index: int, total_chunks: int, 
                                chunking_method: str) -> Dict[str, Any]:
        """Create enhanced metadata for RAG accuracy."""
        analysis = self.analyze_content(text, file_path)
        file_info = self._get_file_info(file_path)

        return {
            # Core chunk info
            "source": doc_id,
            "chunk_index": chunk_index,
            "total_chunks": total_chunks,
            "chunking_method": chunking_method,

            # File info
            "file_name": file_info["file_name"],
            "file_extension": file_info["file_extension"],
            "file_size_mb": file_info["file_size_mb"],
            "file_type": file_info["file_type"],

            # Content analysis
            "word_count": analysis["word_count"],
            "sentence_count": analysis["sentence_count"],
            "content_type": analysis["content_type"],
            "keywords": ", ".join(analysis["keywords"]),
            "has_code": analysis["has_code"],
            "has_numbers": analysis["has_numbers"],
            "has_urls": analysis["has_urls"],
            "sentiment": analysis["sentiment"],
            "text_hash": analysis["text_hash"],

            # Processing info
            "processed_at": datetime.now().isoformat()
        }

    def _get_file_info(self, file_path: str) -> Dict[str, Any]:
        """Get file information."""
        try:
            stat = os.stat(file_path)
            return {
                "file_name": os.path.basename(file_path),
                "file_extension": os.path.splitext(file_path)[1],
                "file_size_mb": round(stat.st_size / (1024 * 1024), 2),
                "file_type": self._get_file_type(file_path)
            }
        except:
            return {
                "file_name": os.path.basename(file_path),
                "file_extension": os.path.splitext(file_path)[1],
                "file_size_mb": 0,
                "file_type": "unknown"
            }

    def _get_file_type(self, file_path: str) -> str:
        """Get file type category."""
        ext = os.path.splitext(file_path)[1].lower()

        if ext in ['.pdf', '.docx', '.doc', '.txt', '.md']:
            return 'document'
        elif ext in ['.xlsx', '.xls', '.csv']:
            return 'spreadsheet'
        elif ext in ['.py', '.js', '.java', '.html']:
            return 'code'
        elif ext in ['.jpg', '.png', '.gif']:
            return 'image'

        return 'other'

Enter fullscreen mode Exit fullscreen mode

4. Q&A Chatbot client

#!/usr/bin/env python3
"""
Hybrid QA Chatbot - Integrates with Hybrid Semantic Processor
"""

import os
from typing import List, Dict, Any
from dotenv import load_dotenv
from hybrid_semantic_processor import HybridSemanticProcessor

# Load environment variables
load_dotenv()

class HybridQAChatbot:
    def __init__(self):
        """Initialize the hybrid QA chatbot."""
        print("πŸ€– Hybrid QA Chatbot")
        print("="*40)

        # Initialize the hybrid processor
        self.processor = HybridSemanticProcessor()

        # Check if OpenAI API key is available
        self.openai_api_key = os.getenv('OPENAI_API_KEY')
        if self.openai_api_key:
            try:
                from openai import OpenAI
                self.openai_client = OpenAI(api_key=self.openai_api_key)
                print("   βœ… OpenAI integration enabled")
            except ImportError:
                print("   ⚠️  OpenAI package not installed")
                self.openai_api_key = None
        else:
            print("   ⚠️  OpenAI API key not found - using basic search only")
            self.openai_api_key = None

        print("   🧠 Semantic processor ready")
        print("   πŸ” Enhanced search enabled")
        print("βœ… Chatbot ready!")

    def search_documents(self, query: str, k: int = 8) -> List[Dict[str, Any]]:
        """Search documents using hybrid search."""
        try:
            results = self.processor.hybrid_search(query, k=k)
            return results
        except Exception as e:
            print(f"   ❌ Search error: {e}")
            return []

    def format_context(self, search_results: List[Dict[str, Any]]) -> str:
        """Format search results into context for AI."""
        if not search_results:
            return "No relevant documents found."

        context_parts = []
        for i, result in enumerate(search_results[:5]):  # Use top 5 results
            doc = result["document"]
            metadata = doc.metadata

            # Format source information
            file_name = metadata.get('file_name', 'Unknown')
            page_num = metadata.get('page_number', 'N/A')
            method = metadata.get('processing_method', 'unknown')

            # Format content
            content = doc.page_content.strip()
            if len(content) > 300:
                content = content[:300] + "..."

            # Add to context
            context_parts.append(f"""
=== Source {i+1}: {file_name} (Page {page_num}) ===
Processing Method: {method}
Content: {content}
""")

        return "\n".join(context_parts)

    def ask_openai(self, query: str, context: str) -> str:
        """Ask OpenAI for an answer based on context."""
        if not self.openai_api_key:
            return "OpenAI integration not available."

        try:
            prompt = f"""You are a helpful AI assistant. Answer the user's question based on the provided context from documents.

Context from documents:
{context}

User Question: {query}

Instructions:
1. Answer the question based ONLY on the provided context
2. Be concise but comprehensive
3. If the context doesn't contain enough information, say so
4. Cite specific sources when possible (e.g., "According to Source 1...")
5. If you're unsure, acknowledge the limitations

Answer:"""

            response = self.openai_client.chat.completions.create(
                model="gpt-4",
                messages=[
                    {"role": "system", "content": "You are a helpful AI assistant that answers questions based on provided document context."},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.1,
                max_tokens=500
            )

            return response.choices[0].message.content.strip()

        except Exception as e:
            return f"Error getting AI response: {e}"

    def chat(self):
        """Main chat loop."""
        print(f"\nπŸ€– Welcome to the Hybrid QA Chatbot!")
        print(f"πŸ’‘ Ask me anything about your documents")
        print(f"πŸ” I'll search through your knowledge base and provide intelligent answers")
        print(f"πŸ“ Type 'quit' or 'exit' to end the chat")
        print(f"πŸ“Š Type 'stats' to see database statistics")
        print(f"πŸ” Type 'search <query>' for direct search results")
        print("-" * 60)

        while True:
            try:
                # Get user input
                user_input = input("\nπŸ€” You: ").strip()

                if user_input.lower() in ['quit', 'exit', 'bye']:
                    print("πŸ‘‹ Goodbye! Thanks for using the Hybrid QA Chatbot!")
                    break

                if user_input.lower() == 'stats':
                    self.show_stats()
                    continue

                if user_input.lower().startswith('search '):
                    query = user_input[7:].strip()
                    self.show_search_results(query)
                    continue

                if not user_input:
                    continue

                print(f"\nπŸ” Searching for: '{user_input}'")

                # Search documents
                search_results = self.search_documents(user_input, k=8)

                if not search_results:
                    print("❌ No relevant documents found.")
                    continue

                # Format context
                context = self.format_context(search_results)

                # Get AI answer
                print(f"\nπŸ€– AI Answer:")
                ai_answer = self.ask_openai(user_input, context)
                print(ai_answer)

                # Show source summary
                print(f"\nπŸ“š Sources ({len(search_results)} found):")
                for i, result in enumerate(search_results[:3]):
                    doc = result["document"]
                    metadata = doc.metadata
                    file_name = metadata.get('file_name', 'Unknown')
                    page_num = metadata.get('page_number', 'N/A')
                    score = result.get('final_score', 0)
                    print(f"   {i+1}. {file_name} (Page {page_num}) - Score: {score:.3f}")

                if len(search_results) > 3:
                    print(f"   ... and {len(search_results) - 3} more results")

            except KeyboardInterrupt:
                print("\n\nπŸ‘‹ Chat interrupted. Goodbye!")
                break
            except Exception as e:
                print(f"❌ Error: {e}")

    def show_stats(self):
        """Show database statistics."""
        print(f"\nπŸ“Š Database Statistics:")
        print("-" * 30)

        collection = self.processor.vectorstore._collection
        count = collection.count()
        print(f"   πŸ“„ Total Documents: {count}")

        # Show file tracker info
        try:
            import sqlite3
            conn = sqlite3.connect(self.processor.file_tracker_db)
            cursor = conn.cursor()
            cursor.execute("SELECT COUNT(*) FROM file_tracker")
            file_count = cursor.fetchone()[0]

            cursor.execute("SELECT file_name, chunk_count, processing_method FROM file_tracker")
            files = cursor.fetchall()

            print(f"   πŸ“ Tracked Files: {file_count}")
            print(f"\nπŸ“‹ File Details:")
            for file_name, chunk_count, method in files:
                print(f"   πŸ“„ {file_name}: {chunk_count} chunks ({method})")

            conn.close()
        except Exception as e:
            print(f"   ⚠️  Could not retrieve file details: {e}")

    def show_search_results(self, query: str):
        """Show detailed search results."""
        print(f"\nπŸ” Detailed Search Results for: '{query}'")
        print("-" * 50)

        search_results = self.search_documents(query, k=5)

        if not search_results:
            print("❌ No results found.")
            return

        for i, result in enumerate(search_results):
            doc = result["document"]
            metadata = doc.metadata

            print(f"\nπŸ“„ Result {i+1}:")
            print(f"   πŸ“ File: {metadata.get('file_name', 'Unknown')}")
            print(f"   πŸ“– Page: {metadata.get('page_number', 'N/A')}")
            print(f"   🏷️  Method: {metadata.get('processing_method', 'Unknown')}")
            print(f"   πŸ“Š Score: {result.get('final_score', 0):.3f}")
            print(f"   πŸ”‘ Keywords: {metadata.get('keywords', 'N/A')[:100]}...")
            print(f"   πŸ’‘ Content: {doc.page_content[:200]}...")

def main():
    """Main function."""
    print("πŸ€– Hybrid QA Chatbot")
    print("="*50)

    chatbot = HybridQAChatbot()
    chatbot.chat()

if __name__ == "__main__":
    main()

Enter fullscreen mode Exit fullscreen mode

Output Document loader

Output chatbot Q&A console app

Top comments (0)