Introduction
Have you ever wondered how to build a document processing system that's smart enough to handle any file type, efficient enough to avoid duplicate work, and sophisticated enough to understand the meaning behind your content? Well, I've been wrestling with exactly this problem, and I want to share what I learned while building a hybrid semantic processor that combines the best of both worlds.
The Problem with Traditional RAG Systems
Let's be honest - most RAG (Retrieval Augmented Generation) systems are pretty basic. They chunk your documents into fixed-size pieces, throw them into a vector database, and hope for the best. But this approach has some serious limitations:
- Dumb chunking: Splitting text every 1000 characters doesn't respect semantic boundaries
- No change detection: Re-processing everything when one file changes
- Poor metadata: Limited search context beyond basic similarity
- File format chaos: Different loaders for different file types
What if we could do better? What if we could build a system that understands document structure, tracks changes intelligently, and provides rich search context? That's exactly what this hybrid semantic processor does.
The Architecture: Best of Both Worlds
The system I built combines three powerful approaches:
1. Intelligent Document Loading
Instead of treating all documents the same, the system uses specialized loaders for different file types. The UniversalDocumentLoader
class automatically detects file extensions and routes them to appropriate handlers:
@staticmethod
def load_document(file_path: str) -> List[Document]:
"""Load a document from file path, automatically detecting the format."""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.pdf':
return UniversalDocumentLoader._load_pdf(file_path)
elif file_ext in ['.txt', '.md']:
return UniversalDocumentLoader._load_text(file_path)
elif file_ext == '.docx':
return UniversalDocumentLoader._load_word(file_path)
elif file_ext == '.xlsx':
return UniversalDocumentLoader._load_excel(file_path)
elif file_ext == '.pptx':
return UniversalDocumentLoader._load_powerpoint(file_path)
elif file_ext == '.html':
return UniversalDocumentLoader._load_html(file_path)
else:
raise ValueError(f"Unsupported file type: {file_ext}")
2. Smart Chunking Strategy
This is where things get interesting. The system doesn't just use one chunking method - it chooses the best approach for each situation:
def _process_document(self, file_path: str, force_semantic: bool = False) -> List[Document]:
"""Process a single document into chunks."""
documents = UniversalDocumentLoader.load_document(file_path)
all_chunks = []
# Check if we should use semantic chunking
use_semantic = force_semantic or (self.semantic_chunker and not self._is_file_changed(file_path))
for i, doc in enumerate(documents):
# Choose chunking method
if use_semantic and self.semantic_chunker:
try:
chunks = self.semantic_chunker.split_documents([doc])
method = "semantic"
print(f" π§ Using semantic chunking: {len(chunks)} chunks")
except Exception as e:
print(f" β οΈ Semantic chunking failed, using text: {e}")
chunks = self.text_splitter.split_documents([doc])
method = "text_fallback"
else:
chunks = self.text_splitter.split_documents([doc])
method = "text"
print(f" π Using text chunking: {len(chunks)} chunks")
The semantic chunker uses embeddings to understand where natural break points occur in the text, creating chunks that respect the document's semantic structure rather than arbitrary character limits.
3. Rich Metadata Enhancement
Every chunk gets enhanced with a wealth of searchable metadata. The system analyzes each chunk to extract:
# Extract keywords and important points
keywords = self._extract_keywords(chunk.page_content)
important_points = self._extract_important_points(chunk.page_content)
enhanced_metadata = self.metadata_enhancer.create_enhanced_metadata(
doc_id=chunk.metadata.get("source", file_path),
text=chunk.page_content,
file_path=file_path,
chunk_index=j,
total_chunks=len(chunks),
chunking_method=method
)
# Add enhanced search metadata
enhanced_metadata.update({
"keywords": ", ".join(keywords),
"important_points": ", ".join(important_points),
"page_number": i + 1 if len(documents) > 1 else 1,
"total_pages": len(documents),
"chunk_keywords": ", ".join(keywords[:10]),
"search_terms": ", ".join(keywords + important_points[:5]),
"processing_method": method
})
The Magic of SQLRecordManager: Never Duplicate Work Again
Now, here's where things get really smart. The system uses LangChain's SQLRecordManager, and this component is absolutely crucial for production RAG systems. Let me explain why.
What SQLRecordManager Actually Does
Think of SQLRecordManager as the "memory" of your RAG system. It tracks which documents have been indexed and uses timestamps to determine what's changed. But it's much more sophisticated than a simple file tracker.
Here's what happens under the hood:
Record Tracking: Every time a document chunk gets indexed, the RecordManager creates a record with:
- The document's unique identifier
- A timestamp of when it was processed
- A hash of the content
- Metadata about the source
Smart Updates: When you run incremental indexing, it automatically cleans up outdated document versions while minimizing the time both old and new versions exist in the system. This means:
- No duplicate embeddings cluttering your vector store
- Automatic removal of outdated content
- Efficient processing that only handles changed files
Cleanup Modes: The system offers different cleanup strategies:
- Incremental: Continuously cleans up documents during indexing, removing outdated versions associated with source IDs that were processed
- Full: Complete cleanup after indexing, ensuring only the latest versions remain
- None: No automatic cleanup (useful for append-only scenarios)
Why This Matters in Practice
Without proper record management, your RAG system becomes a mess:
- Duplicate content confuses retrieval
- Outdated information pollutes results
- Every re-run processes everything from scratch
- Vector store grows indefinitely
With SQLRecordManager, you get:
- Efficiency: Only changed files get reprocessed
- Consistency: No duplicate or stale content
- Scalability: System performance doesn't degrade over time
- Reliability: Automatic cleanup prevents database bloat
File Change Detection: The Efficiency Game-Changer
One of my favorite features is the intelligent file change detection system. Instead of blindly reprocessing everything, the system maintains its own SQLite database tracking:
- File content hashes: Detects actual content changes, not just timestamp updates
- Modification timestamps: Quick checks for obvious changes
- Processing history: What method was used, how many chunks were created
- Performance metrics: Enables optimization over time
When you run the processor, it checks each file against this tracking database:
π document.pdf
βοΈ No changes detected
π updated_guide.docx
π File content changed
π§ Using semantic chunking: 15 chunks
This approach can dramatically reduce processing time on large document collections where only a few files have changed.
The Hybrid Search: Combining Vector and Cross-Encoder Reranking
The search component is where the "hybrid" really shines. Here's the two-stage process:
Stage 1: Vector Similarity Search
First, the system performs traditional semantic search using the SentenceTransformer embeddings. This gives us a broad set of potentially relevant documents based on semantic similarity.
Stage 2: Cross-Encoder Reranking
Here's where it gets sophisticated. The system uses a cross-encoder model (BAAI/bge-reranker-v2-m3) to rerank the results. Unlike bi-encoders that create separate embeddings for queries and documents, cross-encoders process query-document pairs together, leading to much more accurate relevance scoring.
The final score combines both approaches:
# Rerank with cross-encoder
if all_results:
try:
pairs = [(query, result["document"].page_content) for result in all_results]
rerank_scores = self.reranker.predict(pairs)
for i, result in enumerate(all_results):
result["rerank_score"] = rerank_scores[i]
result["final_score"] = (result["score"] + result["rerank_score"]) / 2
all_results.sort(key=lambda x: x["final_score"], reverse=True)
print(f" β
Found {len(all_results)} results, reranked with cross-encoder")
except Exception as e:
print(f" β οΈ Reranking failed: {e}")
all_results.sort(key=lambda x: x["score"], reverse=True)
This gives you the speed of vector search with the accuracy of cross-encoder ranking.
Rich Metadata: Making Search Actually Work
The metadata enhancement system is designed around a simple principle: the more context you provide, the better your search results. For each chunk, the system extracts:
Technical Metadata:
- Programming language detection
- Code presence indicators
- URL and number detection
- File type classification
Content Analysis:
- Keyword extraction using frequency analysis
- Domain-specific term recognition (AI, business, technical terms)
- Important sentence identification
- Sentiment indicators
Processing Context:
- Chunking method used
- File structure information
- Processing timestamps
- Chunk positioning data
This rich metadata enables much more precise search queries. Instead of just finding semantically similar text, you can search for "Python code examples" or "business strategy documents with financial data."
Performance Optimizations That Actually Matter
The system includes several performance optimizations that make it production-ready:
Batch Processing
Documents are processed in batches of 100, balancing memory usage with processing speed.
Adaptive Chunking
The system chooses chunking methods based on file state - semantic for new/changed files, text splitting for quick updates.
Efficient Storage
Multiple database backends handle different concerns:
- ChromaDB for vector storage
- SQLite for file tracking
- SQLRecordManager for indexing state
Memory Management
The embedding models are loaded once and reused across all processing operations.
Real-World Usage Patterns
In practice, this system excels at several common scenarios:
Content Management: Marketing teams can maintain large libraries of content with automatic deduplication and change tracking.
Technical Documentation: Development teams can index code repositories, documentation, and tutorials with proper semantic chunking that respects code structure.
Research Applications: Academic or business research teams can process mixed document types with rich metadata for precise search.
Knowledge Management: Organizations can build comprehensive knowledge bases that stay up-to-date automatically.
The Code Walkthrough: Key Implementation Details
Let me highlight some particularly elegant parts of the implementation:
Universal Document Loading
The UniversalDocumentLoader
class automatically detects file types and routes them to appropriate handlers. Looking at the actual implementation, the Excel loader preserves table structure by joining cells with separators:
# Extract text content from the sheet
content_parts = []
for row in worksheet.iter_rows(values_only=True):
row_content = []
for cell_value in row:
if cell_value is not None:
row_content.append(str(cell_value))
if row_content:
content_parts.append(" | ".join(row_content))
if content_parts:
# Create document for this sheet
content = "\n".join(content_parts)
metadata = {
'source': self.file_path,
'file_type': 'excel',
'document_type': 'xlsx',
'sheet_name': sheet_name,
'total_rows': len(content_parts)
}
The system supports multiple file formats through specialized loaders:
if file_ext == '.pdf':
return UniversalDocumentLoader._load_pdf(file_path)
elif file_ext in ['.txt', '.md']:
return UniversalDocumentLoader._load_text(file_path)
elif file_ext == '.docx':
return UniversalDocumentLoader._load_word(file_path)
elif file_ext in ['.xlsx', '.xls', '.csv']:
return UniversalDocumentLoader._load_excel(file_path)
elif file_ext == '.pptx':
return UniversalDocumentLoader._load_powerpoint(file_path)
elif file_ext == '.html':
return UniversalDocumentLoader._load_html(file_path)
Metadata Enhancement Pipeline
The metadata enhancer analyzes content across multiple dimensions. Here's how it actually works in the code:
def _extract_keywords(self, text: str) -> List[str]:
"""Extract important keywords from text."""
import re
from collections import Counter
text = re.sub(r'[^\w\s]', ' ', text.lower())
words = text.split()
stop_words = {
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by',
'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did',
'will', 'would', 'could', 'should', 'may', 'might', 'must', 'can', 'this', 'that', 'these', 'those'
}
word_freq = Counter(words)
keywords = [word for word, freq in word_freq.most_common(20)
if word not in stop_words and len(word) > 3 and freq > 1]
return keywords[:15]
The system also extracts important points by analyzing sentence structure:
def _extract_important_points(self, text: str) -> List[str]:
"""Extract important points from text."""
sentences = re.split(r'[.!?]+', text)
important_sentences = []
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) < 20:
continue
importance_score = 0
importance_indicators = [
'important', 'key', 'critical', 'essential', 'significant', 'major',
'primary', 'main', 'core', 'fundamental', 'crucial', 'vital',
'definition', 'concept', 'principle', 'method', 'technique', 'approach'
]
sentence_lower = sentence.lower()
for indicator in importance_indicators:
if indicator in sentence_lower:
importance_score += 1
# Technical terms boost importance
tech_terms = re.findall(r'\b[A-Z][a-zA-Z]*\b', sentence)
importance_score += len(tech_terms) * 0.5
# Numbers boost importance
numbers = re.findall(r'\d+', sentence)
importance_score += len(numbers) * 0.3
if importance_score >= 1:
clean_sentence = re.sub(r'\s+', ' ', sentence).strip()
if len(clean_sentence) > 100:
clean_sentence = clean_sentence[:100] + "..."
important_sentences.append(clean_sentence)
return important_sentences[:5]
Integrated Change Tracking
The file tracking system uses content hashes rather than just timestamps. Here's the actual implementation:
def _get_file_hash(self, file_path: str) -> str:
"""Get file content hash."""
hash_obj = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_obj.update(chunk)
return hash_obj.hexdigest()
def _is_file_changed(self, file_path: str) -> bool:
"""Check if file has changed."""
conn = sqlite3.connect(self.file_tracker_db)
cursor = conn.cursor()
# Get current file info
current_info = self._get_file_info(file_path)
# Check stored info
cursor.execute(
"SELECT file_hash, last_modified FROM file_tracker WHERE file_path = ?",
(file_path,)
)
result = cursor.fetchone()
conn.close()
if not result:
print(f" π New file detected")
return True
stored_hash, stored_modified = result
if current_info["hash"] != stored_hash:
print(f" π File content changed")
return True
if current_info["modified"] > stored_modified:
print(f" π File modified")
return True
print(f" βοΈ No changes detected")
return False
The system maintains a SQLite database to track file processing:
def _init_file_tracker(self):
"""Initialize file tracking database."""
conn = sqlite3.connect(self.file_tracker_db)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS file_tracker (
file_path TEXT PRIMARY KEY,
file_hash TEXT NOT NULL,
last_modified REAL,
last_processed REAL,
chunk_count INTEGER,
processing_method TEXT
)
''')
conn.commit()
conn.close()
Lessons Learned and Future Directions
Building this system taught me several important lessons:
Semantic Chunking Isn't Always Better: While semantic chunking produces more coherent chunks, it's computationally expensive. The hybrid approach of using it selectively provides the best balance.
Metadata Quality Matters More Than Quantity: Rich, well-structured metadata dramatically improves search accuracy. The investment in metadata enhancement pays off quickly.
Change Detection Is Critical: In production systems, efficient change detection is often more important than perfect initial processing. Users care more about quick updates than perfect accuracy.
Cross-Encoder Reranking Is Worth It: The improvement in search relevance from reranking is substantial and worth the additional computational cost.
Getting Started with Your Own Implementation
If you want to build something similar, here are the key components you'll need:
- Document Loaders: Invest in good loaders for your file types
- Chunking Strategy: Implement both semantic and traditional chunking
- Record Management: Use SQLRecordManager or build similar change tracking
- Rich Metadata: Design metadata schemas that support your search patterns
- Hybrid Search: Combine vector search with reranking for better results
The complete system demonstrates that with thoughtful architecture, you can build RAG systems that are both powerful and efficient. The key is combining multiple complementary approaches rather than relying on any single technique.
Conclusion
Building effective RAG systems requires thinking beyond simple vector similarity. The hybrid semantic processor shows how combining intelligent document loading, adaptive chunking, comprehensive metadata enhancement, and smart change detection creates a system that's both powerful and practical.
The SQLRecordManager component, in particular, transforms a basic RAG system into a production-ready solution that can handle real-world complexity without becoming inefficient or unreliable.
Whether you're building a knowledge management system, a documentation assistant, or a research tool, these patterns and techniques can help you create something that actually works reliably at scale. The key is understanding that modern RAG systems need to be intelligent about much more than just semantic similarity - they need to understand document structure, track changes efficiently, and provide rich context for search.
The future of RAG isn't just about better embeddings or larger context windows - it's about building systems that truly understand and adapt to the content they're processing. This hybrid approach points the way toward that future.
Source Code files & Data Flow
1. hybrid_semantic_processor.py
#Author-Sreeni Ramadorai
"""
Hybrid Semantic Processor - Best of both worlds
"""
import os
import hashlib
import sqlite3
from typing import List, Dict, Any
from datetime import datetime
import chromadb
from sentence_transformers import SentenceTransformer, CrossEncoder
from langchain_core.documents import Document
from langchain.indexes import index
from langchain.indexes import SQLRecordManager
from langchain_chroma import Chroma
from langchain_experimental.text_splitter import SemanticChunker
from langchain_text_splitters import RecursiveCharacterTextSplitter
from app.document_loaders import UniversalDocumentLoader
from rag_metadata_enhancer import RAGMetadataEnhancer
class HybridSemanticProcessor:
def __init__(self):
"""Initialize hybrid semantic processor."""
print("π Hybrid Semantic Processor")
# Initialize components
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
self.reranker = CrossEncoder('BAAI/bge-reranker-v2-m3')
self.metadata_enhancer = RAGMetadataEnhancer()
# Initialize semantic chunker
class SentenceTransformerEmbeddings:
def __init__(self, model):
self.model = model
def embed_documents(self, texts):
return self.model.encode(texts).tolist()
def embed_query(self, text):
return self.model.encode([text]).tolist()[0]
try:
self.semantic_chunker = SemanticChunker(
embeddings=SentenceTransformerEmbeddings(self.embedding_model),
breakpoint_threshold_type="percentile"
)
print(" π§ Semantic chunking enabled")
except Exception as e:
print(f" β οΈ Semantic chunking disabled: {e}")
self.semantic_chunker = None
# Fallback text splitter
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
# Initialize ChromaDB
self.client = chromadb.PersistentClient(path="hybrid_chroma_data")
self.vectorstore = Chroma(
client=self.client,
collection_name="hybrid_collection",
embedding_function=self._get_embedding_function()
)
# Initialize Record Manager
self.record_manager = SQLRecordManager(
namespace="hybrid_semantic",
db_url="sqlite:///hybrid_record_manager.sqlite"
)
self.record_manager.create_schema()
# File tracking database
self.file_tracker_db = "hybrid_file_tracker.sqlite"
self._init_file_tracker()
print(f" π Database: {self.vectorstore._collection.count()} documents")
print("β
Ready!")
def _init_file_tracker(self):
"""Initialize file tracking database."""
conn = sqlite3.connect(self.file_tracker_db)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS file_tracker (
file_path TEXT PRIMARY KEY,
file_hash TEXT NOT NULL,
last_modified REAL,
last_processed REAL,
chunk_count INTEGER,
processing_method TEXT
)
''')
conn.commit()
conn.close()
def _get_file_hash(self, file_path: str) -> str:
"""Get file content hash."""
hash_obj = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_obj.update(chunk)
return hash_obj.hexdigest()
def _get_file_info(self, file_path: str) -> Dict[str, Any]:
"""Get file information."""
stat = os.stat(file_path)
return {
"hash": self._get_file_hash(file_path),
"modified": stat.st_mtime,
"size": stat.st_size
}
def _is_file_changed(self, file_path: str) -> bool:
"""Check if file has changed."""
conn = sqlite3.connect(self.file_tracker_db)
cursor = conn.cursor()
# Get current file info
current_info = self._get_file_info(file_path)
# Check stored info
cursor.execute(
"SELECT file_hash, last_modified FROM file_tracker WHERE file_path = ?",
(file_path,)
)
result = cursor.fetchone()
conn.close()
if not result:
print(f" π New file detected")
return True
stored_hash, stored_modified = result
if current_info["hash"] != stored_hash:
print(f" π File content changed")
return True
if current_info["modified"] > stored_modified:
print(f" π File modified")
return True
print(f" βοΈ No changes detected")
return False
def _update_file_tracker(self, file_path: str, chunk_count: int, method: str):
"""Update file tracker."""
file_info = self._get_file_info(file_path)
conn = sqlite3.connect(self.file_tracker_db)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO file_tracker
(file_path, file_hash, last_modified, last_processed, chunk_count, processing_method)
VALUES (?, ?, ?, ?, ?, ?)
''', (
file_path,
file_info["hash"],
file_info["modified"],
datetime.now().timestamp(),
chunk_count,
method
))
conn.commit()
conn.close()
def _get_embedding_function(self):
"""Get embedding function for Chroma."""
class ChromaEmbeddingFunction:
def __init__(self, model):
self.model = model
def __call__(self, texts):
return self.model.encode(texts).tolist()
def embed_query(self, text):
return self.model.encode([text]).tolist()[0]
def embed_documents(self, texts):
return self.model.encode(texts).tolist()
return ChromaEmbeddingFunction(self.embedding_model)
def _process_document(self, file_path: str, force_semantic: bool = False) -> List[Document]:
"""Process a single document into chunks."""
documents = UniversalDocumentLoader.load_document(file_path)
all_chunks = []
# Check if we should use semantic chunking
use_semantic = force_semantic or (self.semantic_chunker and not self._is_file_changed(file_path))
for i, doc in enumerate(documents):
# Choose chunking method
if use_semantic and self.semantic_chunker:
try:
chunks = self.semantic_chunker.split_documents([doc])
method = "semantic"
print(f" π§ Using semantic chunking: {len(chunks)} chunks")
except Exception as e:
print(f" β οΈ Semantic chunking failed, using text: {e}")
chunks = self.text_splitter.split_documents([doc])
method = "text_fallback"
else:
chunks = self.text_splitter.split_documents([doc])
method = "text"
print(f" π Using text chunking: {len(chunks)} chunks")
# Enhance metadata for each chunk
for j, chunk in enumerate(chunks):
# Extract keywords and important points
keywords = self._extract_keywords(chunk.page_content)
important_points = self._extract_important_points(chunk.page_content)
enhanced_metadata = self.metadata_enhancer.create_enhanced_metadata(
doc_id=chunk.metadata.get("source", file_path),
text=chunk.page_content,
file_path=file_path,
chunk_index=j,
total_chunks=len(chunks),
chunking_method=method
)
# Add enhanced search metadata
enhanced_metadata.update({
"keywords": ", ".join(keywords),
"important_points": ", ".join(important_points),
"page_number": i + 1 if len(documents) > 1 else 1,
"total_pages": len(documents),
"chunk_keywords": ", ".join(keywords[:10]),
"search_terms": ", ".join(keywords + important_points[:5]),
"processing_method": method
})
# Merge with original metadata
chunk.metadata.update(enhanced_metadata)
chunk.metadata["source"] = file_path
all_chunks.append(chunk)
return all_chunks, method
def _extract_keywords(self, text: str) -> List[str]:
"""Extract important keywords from text."""
import re
from collections import Counter
text = re.sub(r'[^\w\s]', ' ', text.lower())
words = text.split()
stop_words = {
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by',
'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did',
'will', 'would', 'could', 'should', 'may', 'might', 'must', 'can', 'this', 'that', 'these', 'those'
}
word_freq = Counter(words)
keywords = [word for word, freq in word_freq.most_common(20)
if word not in stop_words and len(word) > 3 and freq > 1]
return keywords[:15]
def _extract_important_points(self, text: str) -> List[str]:
"""Extract important points from text."""
import re
sentences = re.split(r'[.!?]+', text)
important_sentences = []
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) < 20:
continue
importance_score = 0
importance_indicators = [
'important', 'key', 'critical', 'essential', 'significant', 'major',
'primary', 'main', 'core', 'fundamental', 'crucial', 'vital',
'definition', 'concept', 'principle', 'method', 'technique', 'approach'
]
sentence_lower = sentence.lower()
for indicator in importance_indicators:
if indicator in sentence_lower:
importance_score += 1
tech_terms = re.findall(r'\b[A-Z][a-zA-Z]*\b', sentence)
importance_score += len(tech_terms) * 0.5
numbers = re.findall(r'\d+', sentence)
importance_score += len(numbers) * 0.3
if importance_score >= 1:
clean_sentence = re.sub(r'\s+', ' ', sentence).strip()
if len(clean_sentence) > 100:
clean_sentence = clean_sentence[:100] + "..."
important_sentences.append(clean_sentence)
return important_sentences[:5]
def process_all(self, folder_path: str = "raw_data", force_reprocess: bool = False) -> Dict[str, Any]:
"""Process all files using hybrid approach."""
print(f"π Processing: {folder_path}")
if not os.path.exists(folder_path):
print(f"β Folder not found")
return {}
# Collect all documents
all_documents = []
file_stats = {}
for filename in os.listdir(folder_path):
if filename.startswith('.') or filename == '.gitkeep':
continue
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path):
print(f"π {filename}")
# Check if file needs processing
if not force_reprocess and not self._is_file_changed(file_path):
print(f" βοΈ Skipping unchanged file")
continue
try:
documents, method = self._process_document(file_path, force_semantic=force_reprocess)
all_documents.extend(documents)
file_stats[filename] = {
"chunks": len(documents),
"method": method
}
print(f" π {len(documents)} chunks prepared ({method})")
# Update file tracker
self._update_file_tracker(file_path, len(documents), method)
except Exception as e:
print(f" β Error: {e}")
if not all_documents:
print("β No documents to process")
return {}
print(f"\nπ Indexing {len(all_documents)} total chunks...")
# Use LangChain indexing API
try:
result = index(
all_documents,
self.record_manager,
self.vectorstore,
cleanup="incremental",
source_id_key="source",
key_encoder="blake2b",
batch_size=100
)
print(f"\nπ Indexing Results:")
print(f" β
Added: {result['num_added']} chunks")
print(f" π Updated: {result['num_updated']} chunks")
print(f" βοΈ Skipped: {result['num_skipped']} chunks")
print(f" ποΈ Deleted: {result['num_deleted']} chunks")
print(f" π Database: {self.vectorstore._collection.count()} total documents")
# Show file processing summary
print(f"\nπ File Processing Summary:")
for filename, stats in file_stats.items():
print(f" π {filename}: {stats['chunks']} chunks ({stats['method']})")
return result
except Exception as e:
print(f"β Indexing error: {e}")
return {"status": "error", "error": str(e)}
def hybrid_search(self, query: str, k: int = 10) -> List[Dict[str, Any]]:
"""Perform hybrid search with reranking."""
print(f"π Hybrid search: '{query}'")
# Vector search
vector_results = self.vectorstore.similarity_search_with_score(query, k=k*2)
# Combine results
all_results = []
for doc, score in vector_results:
all_results.append({
"document": doc,
"score": score,
"type": "vector"
})
# Rerank with cross-encoder
if all_results:
try:
pairs = [(query, result["document"].page_content) for result in all_results]
rerank_scores = self.reranker.predict(pairs)
for i, result in enumerate(all_results):
result["rerank_score"] = rerank_scores[i]
result["final_score"] = (result["score"] + result["rerank_score"]) / 2
all_results.sort(key=lambda x: x["final_score"], reverse=True)
print(f" β
Found {len(all_results)} results, reranked with cross-encoder")
except Exception as e:
print(f" β οΈ Reranking failed: {e}")
all_results.sort(key=lambda x: x["score"], reverse=True)
return all_results[:k]
def main():
"""Main function."""
print("π Hybrid Semantic Processor")
print("="*50)
processor = HybridSemanticProcessor()
# Process files
result = processor.process_all()
# Test search
print(f"\nπ Testing enhanced search...")
test_query = "MCP transport types"
search_results = processor.hybrid_search(test_query, k=3)
if search_results:
print(f"\nπ Search Results for '{test_query}':")
for i, result in enumerate(search_results):
doc = result["document"]
print(f"\n{i+1}. Score: {result['final_score']:.3f} ({result['type']})")
print(f" π {doc.metadata.get('file_name', 'Unknown')} - Page {doc.metadata.get('page_number', 'N/A')}")
print(f" π Keywords: {doc.metadata.get('keywords', 'N/A')[:80]}...")
print(f" π‘ Content: {doc.page_content[:200]}...")
print(f"\nπ Complete!")
if __name__ == "__main__":
main()
2. Document Loader document_loaders.py
"""
Document loaders for different file types.
Supports text files, PDF files, Word documents, Excel files, PowerPoint, HTML, and Markdown.
"""
import os
from typing import List, Dict, Any
from langchain_core.documents import Document
# Core loaders
from langchain_community.document_loaders import TextLoader, PyPDFLoader
# New loaders for additional file types
from langchain_community.document_loaders import (
UnstructuredWordDocumentLoader,
UnstructuredPowerPointLoader,
UnstructuredHTMLLoader,
UnstructuredMarkdownLoader
)
# Custom Excel loader using openpyxl
import openpyxl
class CustomExcelLoader:
"""Custom Excel loader using openpyxl for better compatibility."""
def __init__(self, file_path: str):
self.file_path = file_path
def load(self) -> List[Document]:
"""Load Excel file and convert to documents."""
try:
workbook = openpyxl.load_workbook(self.file_path, data_only=True)
documents = []
for sheet_name in workbook.sheetnames:
worksheet = workbook[sheet_name]
# Extract text content from the sheet
content_parts = []
for row in worksheet.iter_rows(values_only=True):
row_content = []
for cell_value in row:
if cell_value is not None:
row_content.append(str(cell_value))
if row_content:
content_parts.append(" | ".join(row_content))
if content_parts:
# Create document for this sheet
content = "\n".join(content_parts)
metadata = {
'source': self.file_path,
'file_type': 'excel',
'document_type': 'xlsx',
'sheet_name': sheet_name,
'total_rows': len(content_parts)
}
documents.append(Document(
page_content=content,
metadata=metadata
))
return documents
except Exception as e:
raise RuntimeError(f"Failed to load Excel file {self.file_path}: {str(e)}")
class UniversalDocumentLoader:
"""
A universal document loader that can handle multiple file formats.
Automatically detects file type and uses appropriate loader.
"""
@staticmethod
def load_document(file_path: str) -> List[Document]:
"""
Load a document from file path, automatically detecting the format.
Args:
file_path: Path to the document file
Returns:
List of Document objects (usually 1 for most files, multiple for PDFs with pages)
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.pdf':
return UniversalDocumentLoader._load_pdf(file_path)
elif file_ext in ['.txt', '.md']:
return UniversalDocumentLoader._load_text(file_path)
elif file_ext == '.docx':
return UniversalDocumentLoader._load_word(file_path)
elif file_ext == '.xlsx':
return UniversalDocumentLoader._load_excel(file_path)
elif file_ext == '.pptx':
return UniversalDocumentLoader._load_powerpoint(file_path)
elif file_ext == '.html':
return UniversalDocumentLoader._load_html(file_path)
else:
raise ValueError(f"Unsupported file type: {file_ext}")
@staticmethod
def _load_pdf(file_path: str) -> List[Document]:
"""Load PDF file using PyPDFLoader."""
try:
loader = PyPDFLoader(file_path)
documents = loader.load()
# Add source metadata to each document
for doc in documents:
doc.metadata['source'] = file_path
doc.metadata['file_type'] = 'pdf'
doc.metadata['page_number'] = doc.metadata.get('page', 0)
return documents
except Exception as e:
raise RuntimeError(f"Failed to load PDF {file_path}: {str(e)}")
@staticmethod
def _load_text(file_path: str) -> List[Document]:
"""Load text file using TextLoader."""
try:
loader = TextLoader(file_path)
documents = loader.load()
# Add source metadata to each document
for doc in documents:
doc.metadata['source'] = file_path
doc.metadata['file_type'] = 'text'
return documents
except Exception as e:
raise RuntimeError(f"Failed to load text file {file_path}: {str(e)}")
@staticmethod
def _load_word(file_path: str) -> List[Document]:
"""Load Word document using UnstructuredWordDocumentLoader."""
try:
loader = UnstructuredWordDocumentLoader(file_path)
documents = loader.load()
# Add source metadata to each document
for doc in documents:
doc.metadata['source'] = file_path
doc.metadata['file_type'] = 'word'
doc.metadata['document_type'] = 'docx'
return documents
except Exception as e:
raise RuntimeError(f"Failed to load Word document {file_path}: {str(e)}")
@staticmethod
def _load_excel(file_path: str) -> List[Document]:
"""Load Excel file using custom openpyxl-based loader."""
try:
loader = CustomExcelLoader(file_path)
documents = loader.load()
# Add source metadata to each document
for doc in documents:
doc.metadata['source'] = file_path
doc.metadata['file_type'] = 'excel'
doc.metadata['document_type'] = 'xlsx'
return documents
except Exception as e:
raise RuntimeError(f"Failed to load Excel file {file_path}: {str(e)}")
@staticmethod
def _load_powerpoint(file_path: str) -> List[Document]:
"""Load PowerPoint file using UnstructuredPowerPointLoader."""
try:
loader = UnstructuredPowerPointLoader(file_path)
documents = loader.load()
# Add source metadata to each document
for doc in documents:
doc.metadata['source'] = file_path
doc.metadata['file_type'] = 'powerpoint'
doc.metadata['document_type'] = 'pptx'
# PowerPoint-specific metadata
if 'slide_number' in doc.metadata:
doc.metadata['slide_number'] = doc.metadata['slide_number']
return documents
except Exception as e:
raise RuntimeError(f"Failed to load PowerPoint file {file_path}: {str(e)}")
@staticmethod
def _load_html(file_path: str) -> List[Document]:
"""Load HTML file using UnstructuredHTMLLoader."""
try:
loader = UnstructuredHTMLLoader(file_path)
documents = loader.load()
# Add source metadata to each document
for doc in documents:
doc.metadata['source'] = file_path
doc.metadata['file_type'] = 'html'
doc.metadata['document_type'] = 'html'
return documents
except Exception as e:
raise RuntimeError(f"Failed to load HTML file {file_path}: {str(e)}")
@staticmethod
def get_supported_extensions() -> List[str]:
"""Get list of supported file extensions."""
return ['.txt', '.md', '.pdf', '.docx', '.xlsx', '.pptx', '.html']
def load_documents_from_directory(directory_path: str) -> List[Document]:
"""
Load all supported documents from a directory.
Args:
directory_path: Path to directory containing documents
Returns:
List of all Document objects from the directory
"""
if not os.path.exists(directory_path):
raise FileNotFoundError(f"Directory not found: {directory_path}")
all_documents = []
supported_extensions = UniversalDocumentLoader.get_supported_extensions()
for filename in os.listdir(directory_path):
file_path = os.path.join(directory_path, filename)
if os.path.isfile(file_path):
file_ext = os.path.splitext(filename)[1].lower()
if file_ext in supported_extensions:
try:
documents = UniversalDocumentLoader.load_document(file_path)
all_documents.extend(documents)
except Exception as e:
print(f"Warning: Failed to load {filename}: {str(e)}")
continue
return all_documents
- RAG Metadata Enhancer
#!/usr/bin/env python3
"""
RAG Metadata Enhancer - Key fields to increase search accuracy
"""
import os
import re
import hashlib
from typing import List, Dict, Any
from datetime import datetime
from collections import Counter
class RAGMetadataEnhancer:
def __init__(self):
self.technical_terms = {
'ai', 'ml', 'machine learning', 'deep learning', 'neural network', 'algorithm',
'data science', 'python', 'javascript', 'api', 'database', 'cloud', 'aws',
'docker', 'kubernetes', 'microservices', 'rest', 'graphql', 'sql', 'nosql'
}
self.business_terms = {
'strategy', 'business', 'market', 'customer', 'revenue', 'profit', 'growth',
'investment', 'finance', 'management', 'leadership', 'team', 'project'
}
def extract_keywords(self, text: str) -> List[str]:
"""Extract relevant keywords."""
words = re.findall(r'\b\w+\b', text.lower())
words = [w for w in words if len(w) > 2]
# Get frequency
word_freq = Counter(words)
# Add technical/business terms
keywords = []
text_lower = text.lower()
for term_set in [self.technical_terms, self.business_terms]:
for term in term_set:
if term in text_lower:
keywords.append(term)
# Add high-frequency words
for word, freq in word_freq.most_common(10):
if freq > 1 and word not in keywords:
keywords.append(word)
return keywords[:15]
def analyze_content(self, text: str, file_path: str) -> Dict[str, Any]:
"""Analyze content for metadata."""
# Basic stats
words = text.split()
sentences = text.split('.')
# Content type
content_type = self._detect_content_type(text, file_path)
# Keywords
keywords = self.extract_keywords(text)
# Features
has_code = bool(re.search(r'```
|def |class |import ', text))
has_numbers = bool(re.search(r'\d+', text))
has_urls = bool(re.search(r'http[s]?://', text))
# Sentiment (simple)
positive_words = {'good', 'great', 'excellent', 'amazing', 'best'}
negative_words = {'bad', 'terrible', 'awful', 'worst', 'problem'}
text_lower = text.lower()
positive_count = sum(1 for word in positive_words if word in text_lower)
negative_count = sum(1 for word in negative_words if word in text_lower)
sentiment = 'positive' if positive_count > negative_count else 'negative' if negative_count > positive_count else 'neutral'
return {
"word_count": len(words),
"sentence_count": len(sentences),
"content_type": content_type,
"keywords": keywords,
"has_code": has_code,
"has_numbers": has_numbers,
"has_urls": has_urls,
"sentiment": sentiment,
"text_hash": hashlib.sha256(text.encode()).hexdigest()
}
def _detect_content_type(self, text: str, file_path: str) -> str:
"""Detect content type."""
text_lower = text.lower()
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext in ['.py', '.js', '.java']:
return 'code'
elif file_ext in ['.md', '.txt']:
return 'documentation'
elif re.search(r'
```|def |class |import ', text):
return 'code'
elif re.search(r'# |## |### ', text):
return 'documentation'
elif re.search(r'tutorial|guide|how to', text_lower):
return 'tutorial'
return 'general'
def create_enhanced_metadata(self, doc_id: str, text: str, file_path: str,
chunk_index: int, total_chunks: int,
chunking_method: str) -> Dict[str, Any]:
"""Create enhanced metadata for RAG accuracy."""
analysis = self.analyze_content(text, file_path)
file_info = self._get_file_info(file_path)
return {
# Core chunk info
"source": doc_id,
"chunk_index": chunk_index,
"total_chunks": total_chunks,
"chunking_method": chunking_method,
# File info
"file_name": file_info["file_name"],
"file_extension": file_info["file_extension"],
"file_size_mb": file_info["file_size_mb"],
"file_type": file_info["file_type"],
# Content analysis
"word_count": analysis["word_count"],
"sentence_count": analysis["sentence_count"],
"content_type": analysis["content_type"],
"keywords": ", ".join(analysis["keywords"]),
"has_code": analysis["has_code"],
"has_numbers": analysis["has_numbers"],
"has_urls": analysis["has_urls"],
"sentiment": analysis["sentiment"],
"text_hash": analysis["text_hash"],
# Processing info
"processed_at": datetime.now().isoformat()
}
def _get_file_info(self, file_path: str) -> Dict[str, Any]:
"""Get file information."""
try:
stat = os.stat(file_path)
return {
"file_name": os.path.basename(file_path),
"file_extension": os.path.splitext(file_path)[1],
"file_size_mb": round(stat.st_size / (1024 * 1024), 2),
"file_type": self._get_file_type(file_path)
}
except:
return {
"file_name": os.path.basename(file_path),
"file_extension": os.path.splitext(file_path)[1],
"file_size_mb": 0,
"file_type": "unknown"
}
def _get_file_type(self, file_path: str) -> str:
"""Get file type category."""
ext = os.path.splitext(file_path)[1].lower()
if ext in ['.pdf', '.docx', '.doc', '.txt', '.md']:
return 'document'
elif ext in ['.xlsx', '.xls', '.csv']:
return 'spreadsheet'
elif ext in ['.py', '.js', '.java', '.html']:
return 'code'
elif ext in ['.jpg', '.png', '.gif']:
return 'image'
return 'other'
4. Q&A Chatbot client
#!/usr/bin/env python3
"""
Hybrid QA Chatbot - Integrates with Hybrid Semantic Processor
"""
import os
from typing import List, Dict, Any
from dotenv import load_dotenv
from hybrid_semantic_processor import HybridSemanticProcessor
# Load environment variables
load_dotenv()
class HybridQAChatbot:
def __init__(self):
"""Initialize the hybrid QA chatbot."""
print("π€ Hybrid QA Chatbot")
print("="*40)
# Initialize the hybrid processor
self.processor = HybridSemanticProcessor()
# Check if OpenAI API key is available
self.openai_api_key = os.getenv('OPENAI_API_KEY')
if self.openai_api_key:
try:
from openai import OpenAI
self.openai_client = OpenAI(api_key=self.openai_api_key)
print(" β
OpenAI integration enabled")
except ImportError:
print(" β οΈ OpenAI package not installed")
self.openai_api_key = None
else:
print(" β οΈ OpenAI API key not found - using basic search only")
self.openai_api_key = None
print(" π§ Semantic processor ready")
print(" π Enhanced search enabled")
print("β
Chatbot ready!")
def search_documents(self, query: str, k: int = 8) -> List[Dict[str, Any]]:
"""Search documents using hybrid search."""
try:
results = self.processor.hybrid_search(query, k=k)
return results
except Exception as e:
print(f" β Search error: {e}")
return []
def format_context(self, search_results: List[Dict[str, Any]]) -> str:
"""Format search results into context for AI."""
if not search_results:
return "No relevant documents found."
context_parts = []
for i, result in enumerate(search_results[:5]): # Use top 5 results
doc = result["document"]
metadata = doc.metadata
# Format source information
file_name = metadata.get('file_name', 'Unknown')
page_num = metadata.get('page_number', 'N/A')
method = metadata.get('processing_method', 'unknown')
# Format content
content = doc.page_content.strip()
if len(content) > 300:
content = content[:300] + "..."
# Add to context
context_parts.append(f"""
=== Source {i+1}: {file_name} (Page {page_num}) ===
Processing Method: {method}
Content: {content}
""")
return "\n".join(context_parts)
def ask_openai(self, query: str, context: str) -> str:
"""Ask OpenAI for an answer based on context."""
if not self.openai_api_key:
return "OpenAI integration not available."
try:
prompt = f"""You are a helpful AI assistant. Answer the user's question based on the provided context from documents.
Context from documents:
{context}
User Question: {query}
Instructions:
1. Answer the question based ONLY on the provided context
2. Be concise but comprehensive
3. If the context doesn't contain enough information, say so
4. Cite specific sources when possible (e.g., "According to Source 1...")
5. If you're unsure, acknowledge the limitations
Answer:"""
response = self.openai_client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful AI assistant that answers questions based on provided document context."},
{"role": "user", "content": prompt}
],
temperature=0.1,
max_tokens=500
)
return response.choices[0].message.content.strip()
except Exception as e:
return f"Error getting AI response: {e}"
def chat(self):
"""Main chat loop."""
print(f"\nπ€ Welcome to the Hybrid QA Chatbot!")
print(f"π‘ Ask me anything about your documents")
print(f"π I'll search through your knowledge base and provide intelligent answers")
print(f"π Type 'quit' or 'exit' to end the chat")
print(f"π Type 'stats' to see database statistics")
print(f"π Type 'search <query>' for direct search results")
print("-" * 60)
while True:
try:
# Get user input
user_input = input("\nπ€ You: ").strip()
if user_input.lower() in ['quit', 'exit', 'bye']:
print("π Goodbye! Thanks for using the Hybrid QA Chatbot!")
break
if user_input.lower() == 'stats':
self.show_stats()
continue
if user_input.lower().startswith('search '):
query = user_input[7:].strip()
self.show_search_results(query)
continue
if not user_input:
continue
print(f"\nπ Searching for: '{user_input}'")
# Search documents
search_results = self.search_documents(user_input, k=8)
if not search_results:
print("β No relevant documents found.")
continue
# Format context
context = self.format_context(search_results)
# Get AI answer
print(f"\nπ€ AI Answer:")
ai_answer = self.ask_openai(user_input, context)
print(ai_answer)
# Show source summary
print(f"\nπ Sources ({len(search_results)} found):")
for i, result in enumerate(search_results[:3]):
doc = result["document"]
metadata = doc.metadata
file_name = metadata.get('file_name', 'Unknown')
page_num = metadata.get('page_number', 'N/A')
score = result.get('final_score', 0)
print(f" {i+1}. {file_name} (Page {page_num}) - Score: {score:.3f}")
if len(search_results) > 3:
print(f" ... and {len(search_results) - 3} more results")
except KeyboardInterrupt:
print("\n\nπ Chat interrupted. Goodbye!")
break
except Exception as e:
print(f"β Error: {e}")
def show_stats(self):
"""Show database statistics."""
print(f"\nπ Database Statistics:")
print("-" * 30)
collection = self.processor.vectorstore._collection
count = collection.count()
print(f" π Total Documents: {count}")
# Show file tracker info
try:
import sqlite3
conn = sqlite3.connect(self.processor.file_tracker_db)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM file_tracker")
file_count = cursor.fetchone()[0]
cursor.execute("SELECT file_name, chunk_count, processing_method FROM file_tracker")
files = cursor.fetchall()
print(f" π Tracked Files: {file_count}")
print(f"\nπ File Details:")
for file_name, chunk_count, method in files:
print(f" π {file_name}: {chunk_count} chunks ({method})")
conn.close()
except Exception as e:
print(f" β οΈ Could not retrieve file details: {e}")
def show_search_results(self, query: str):
"""Show detailed search results."""
print(f"\nπ Detailed Search Results for: '{query}'")
print("-" * 50)
search_results = self.search_documents(query, k=5)
if not search_results:
print("β No results found.")
return
for i, result in enumerate(search_results):
doc = result["document"]
metadata = doc.metadata
print(f"\nπ Result {i+1}:")
print(f" π File: {metadata.get('file_name', 'Unknown')}")
print(f" π Page: {metadata.get('page_number', 'N/A')}")
print(f" π·οΈ Method: {metadata.get('processing_method', 'Unknown')}")
print(f" π Score: {result.get('final_score', 0):.3f}")
print(f" π Keywords: {metadata.get('keywords', 'N/A')[:100]}...")
print(f" π‘ Content: {doc.page_content[:200]}...")
def main():
"""Main function."""
print("π€ Hybrid QA Chatbot")
print("="*50)
chatbot = HybridQAChatbot()
chatbot.chat()
if __name__ == "__main__":
main()
Top comments (0)