Technical Acronyms:
- FAISS: Facebook AI Similarity Search—optimized vector search library
- HNSW: Hierarchical Navigable Small World—graph-based approximate nearest neighbor algorithm
- BM25: Best Matching 25—probabilistic ranking function for text retrieval
- TF-IDF: Term Frequency-Inverse Document Frequency—statistical text importance measure
- ANN: Approximate Nearest Neighbor—fast similarity search with accuracy trade-off
- IVF: Inverted File Index—clustering-based search acceleration
Statistical & Mathematical Terms:
- Cosine Similarity: Angle-based similarity between vectors (0-1 for normalized)
- L2 Distance: Euclidean distance between vectors (lower = more similar)
- Recall@K: Proportion of relevant items found in top K results
- Precision@K: Proportion of top K results that are relevant
- RRF: Reciprocal Rank Fusion—method to combine ranked lists
Introduction: Two Philosophies of Finding Information
Imagine you're searching a massive library for books about "machine learning for fraud detection."
The librarian approach (Sparse Retrieval): She checks the card catalog for exact keyword matches. Books with "machine learning" AND "fraud" AND "detection" in the title or index cards get pulled. Fast, predictable, but misses the book titled "AI-Powered Financial Crime Prevention" that's exactly what you need.
The scholar approach (Dense Retrieval): He's read summaries of every book and understands their meaning. He retrieves books about the concept you're after, even if they use different words. Finds that "Financial Crime Prevention" book, but might also grab some tangentially related texts.
Here's another way to think about it: Sparse retrieval is like SQL WHERE clauses—exact matching on indexed terms. Dense retrieval is like asking a knowledgeable colleague—they understand what you mean, not just what you said.
A third analogy: Sparse is a phone book lookup; dense is asking a local for directions. The phone book is precise but only works if you know the exact name. The local understands "that Italian place near the park" even though it's officially called "Giuseppe's Ristorante."
The best production systems use both. This article shows you how.
Sparse Retrieval: The Speed of Keywords
Sparse retrieval represents documents as high-dimensional vectors where most values are zero—only terms present in the document have non-zero values.
TF-IDF: The Foundation
import numpy as np
from typing import List, Dict, Tuple
from collections import Counter
import math
import re
class TFIDFRetriever:
"""
TF-IDF retrieval from scratch.
TF (Term Frequency): How often a term appears in a document
IDF (Inverse Document Frequency): How rare a term is across all documents
Score = TF × IDF (terms frequent in doc but rare overall score highest)
"""
def __init__(self):
self.vocabulary: Dict[str, int] = {}
self.idf: Dict[str, float] = {}
self.doc_vectors: List[Dict[str, float]] = []
self.documents: List[str] = []
def _tokenize(self, text: str) -> List[str]:
"""Simple tokenization—lowercase and split on non-alphanumeric."""
text = text.lower()
tokens = re.findall(r'\b[a-z0-9]+\b', text)
return tokens
def _compute_tf(self, tokens: List[str]) -> Dict[str, float]:
"""Compute term frequency with log normalization."""
counts = Counter(tokens)
tf = {}
for term, count in counts.items():
# Log normalization prevents long documents from dominating
tf[term] = 1 + math.log(count) if count > 0 else 0
return tf
def fit(self, documents: List[str]):
"""Build vocabulary and compute IDF scores."""
self.documents = documents
doc_count = len(documents)
# Count document frequency for each term
doc_freq: Dict[str, int] = Counter()
all_tokens = []
for doc in documents:
tokens = self._tokenize(doc)
all_tokens.append(tokens)
unique_terms = set(tokens)
for term in unique_terms:
doc_freq[term] += 1
# Build vocabulary
self.vocabulary = {term: idx for idx, term in enumerate(doc_freq.keys())}
# Compute IDF: log(N / df) with smoothing
self.idf = {
term: math.log((doc_count + 1) / (df + 1)) + 1
for term, df in doc_freq.items()
}
# Compute document vectors
self.doc_vectors = []
for tokens in all_tokens:
tf = self._compute_tf(tokens)
tfidf = {term: tf_val * self.idf.get(term, 0)
for term, tf_val in tf.items()}
self.doc_vectors.append(tfidf)
return self
def _cosine_similarity(self, vec1: Dict[str, float], vec2: Dict[str, float]) -> float:
"""Compute cosine similarity between sparse vectors."""
common_terms = set(vec1.keys()) & set(vec2.keys())
if not common_terms:
return 0.0
dot_product = sum(vec1[t] * vec2[t] for t in common_terms)
norm1 = math.sqrt(sum(v**2 for v in vec1.values()))
norm2 = math.sqrt(sum(v**2 for v in vec2.values()))
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
def search(self, query: str, top_k: int = 5) -> List[Tuple[int, float, str]]:
"""Search for most similar documents."""
query_tokens = self._tokenize(query)
query_tf = self._compute_tf(query_tokens)
query_vec = {term: tf_val * self.idf.get(term, 0)
for term, tf_val in query_tf.items()}
scores = []
for idx, doc_vec in enumerate(self.doc_vectors):
score = self._cosine_similarity(query_vec, doc_vec)
scores.append((idx, score, self.documents[idx]))
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
# Example usage
if __name__ == "__main__":
documents = [
"Machine learning models detect fraudulent transactions in real-time",
"Deep neural networks process natural language for chatbots",
"Fraud prevention systems use AI to identify suspicious patterns",
"Natural language processing enables semantic search capabilities",
"Financial crime detection leverages machine learning algorithms",
]
retriever = TFIDFRetriever()
retriever.fit(documents)
results = retriever.search("AI fraud detection", top_k=3)
print("TF-IDF Results for 'AI fraud detection':")
for idx, score, doc in results:
print(f" [{score:.3f}] {doc[:60]}...")
BM25: The Industry Standard
BM25 improves on TF-IDF with document length normalization and saturation—diminishing returns for term frequency.
import math
from typing import List, Dict, Tuple
from collections import Counter
class BM25Retriever:
"""
BM25 (Best Matching 25) retriever.
Improvements over TF-IDF:
1. Term frequency saturation (k1 parameter)
2. Document length normalization (b parameter)
3. Better theoretical foundation (probabilistic model)
Industry standard for keyword search (Elasticsearch, Solr default).
"""
def __init__(self, k1: float = 1.5, b: float = 0.75):
"""
Args:
k1: Term frequency saturation. Higher = more weight to frequency (1.2-2.0 typical)
b: Length normalization. 0 = no normalization, 1 = full normalization
"""
self.k1 = k1
self.b = b
self.doc_freqs: Dict[str, int] = {}
self.doc_lengths: List[int] = []
self.avg_doc_length: float = 0
self.doc_tokens: List[List[str]] = []
self.documents: List[str] = []
self.N: int = 0
def _tokenize(self, text: str) -> List[str]:
import re
return re.findall(r'\b[a-z0-9]+\b', text.lower())
def fit(self, documents: List[str]):
"""Index documents for BM25 search."""
self.documents = documents
self.N = len(documents)
self.doc_tokens = []
self.doc_lengths = []
self.doc_freqs = Counter()
for doc in documents:
tokens = self._tokenize(doc)
self.doc_tokens.append(tokens)
self.doc_lengths.append(len(tokens))
# Count document frequency (unique terms per doc)
for term in set(tokens):
self.doc_freqs[term] += 1
self.avg_doc_length = sum(self.doc_lengths) / self.N if self.N > 0 else 0
return self
def _idf(self, term: str) -> float:
"""Compute IDF with Robertson-Sparck Jones formula."""
df = self.doc_freqs.get(term, 0)
# Smooth to avoid negative IDF for very common terms
return math.log((self.N - df + 0.5) / (df + 0.5) + 1)
def _score_document(self, query_tokens: List[str], doc_idx: int) -> float:
"""Compute BM25 score for a single document."""
doc_tokens = self.doc_tokens[doc_idx]
doc_length = self.doc_lengths[doc_idx]
term_freqs = Counter(doc_tokens)
score = 0.0
for term in query_tokens:
if term not in term_freqs:
continue
tf = term_freqs[term]
idf = self._idf(term)
# BM25 formula
length_norm = 1 - self.b + self.b * (doc_length / self.avg_doc_length)
tf_component = (tf * (self.k1 + 1)) / (tf + self.k1 * length_norm)
score += idf * tf_component
return score
def search(self, query: str, top_k: int = 5) -> List[Tuple[int, float, str]]:
"""Search for most relevant documents."""
query_tokens = self._tokenize(query)
scores = []
for idx in range(self.N):
score = self._score_document(query_tokens, idx)
if score > 0:
scores.append((idx, score, self.documents[idx]))
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
# Comparison with TF-IDF
if __name__ == "__main__":
documents = [
"Machine learning models detect fraudulent transactions in real-time",
"Deep neural networks process natural language for chatbots",
"Fraud prevention systems use AI to identify suspicious patterns",
"Natural language processing enables semantic search capabilities",
"Financial crime detection leverages machine learning algorithms",
# Longer document to test length normalization
"Machine learning and artificial intelligence are transforming fraud detection. "
"Banks use these technologies to analyze transactions. ML models learn patterns. "
"AI systems flag suspicious activity. Fraud prevention is critical for financial security. "
"Real-time detection saves millions of dollars annually."
]
bm25 = BM25Retriever()
bm25.fit(documents)
tfidf = TFIDFRetriever()
tfidf.fit(documents)
query = "machine learning fraud"
print(f"Query: '{query}'\n")
print("BM25 Results:")
for idx, score, doc in bm25.search(query, top_k=3):
print(f" [{score:.3f}] {doc[:50]}...")
print("\nTF-IDF Results:")
for idx, score, doc in tfidf.search(query, top_k=3):
print(f" [{score:.3f}] {doc[:50]}...")
Dense Retrieval: The Power of Meaning
Dense retrieval represents documents as fixed-size vectors where every dimension carries semantic meaning. Similar meanings cluster in vector space.
FAISS: Production Vector Search
import numpy as np
from typing import List, Tuple, Callable, Optional
import time
class FAISSRetriever:
"""
FAISS-based dense retriever with multiple index types.
Index types by use case:
- Flat: Exact search, small datasets (<100K vectors)
- IVF: Clustered search, medium datasets (100K-10M)
- HNSW: Graph-based, best recall/speed trade-off
- IVF-PQ: Compressed, large datasets (10M+)
"""
def __init__(
self,
embedding_fn: Callable[[List[str]], np.ndarray],
index_type: str = "flat",
dimension: int = 384,
nlist: int = 100, # Number of clusters for IVF
m: int = 32, # HNSW connections per layer
ef_construction: int = 200, # HNSW build quality
ef_search: int = 50 # HNSW search quality
):
"""
Args:
embedding_fn: Function to convert text to vectors
index_type: "flat", "ivf", "hnsw", or "ivf_pq"
dimension: Embedding dimension
nlist: IVF cluster count (sqrt(N) is good starting point)
m: HNSW connections (16-64 typical)
ef_construction: HNSW build parameter (higher = better quality, slower build)
ef_search: HNSW search parameter (higher = better recall, slower search)
"""
import faiss
self.embedding_fn = embedding_fn
self.index_type = index_type
self.dimension = dimension
self.documents: List[str] = []
# Build appropriate index
if index_type == "flat":
self.index = faiss.IndexFlatIP(dimension) # Inner product (cosine for normalized)
elif index_type == "ivf":
quantizer = faiss.IndexFlatIP(dimension)
self.index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
self._needs_training = True
elif index_type == "hnsw":
self.index = faiss.IndexHNSWFlat(dimension, m)
self.index.hnsw.efConstruction = ef_construction
self.index.hnsw.efSearch = ef_search
elif index_type == "ivf_pq":
quantizer = faiss.IndexFlatIP(dimension)
# PQ with 8 sub-quantizers, 8 bits each
self.index = faiss.IndexIVFPQ(quantizer, dimension, nlist, 8, 8)
self._needs_training = True
else:
raise ValueError(f"Unknown index type: {index_type}")
self._needs_training = index_type in ["ivf", "ivf_pq"]
def _normalize(self, vectors: np.ndarray) -> np.ndarray:
"""L2 normalize vectors for cosine similarity."""
norms = np.linalg.norm(vectors, axis=1, keepdims=True)
norms[norms == 0] = 1 # Avoid division by zero
return vectors / norms
def fit(self, documents: List[str], batch_size: int = 100):
"""Index documents."""
self.documents = documents
# Embed in batches
all_embeddings = []
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
embeddings = self.embedding_fn(batch)
all_embeddings.append(embeddings)
embeddings = np.vstack(all_embeddings).astype('float32')
embeddings = self._normalize(embeddings)
# Train if needed (IVF indexes)
if self._needs_training:
print(f"Training {self.index_type} index on {len(embeddings)} vectors...")
self.index.train(embeddings)
# Add vectors
self.index.add(embeddings)
print(f"Indexed {self.index.ntotal} vectors")
return self
def search(self, query: str, top_k: int = 5) -> List[Tuple[int, float, str]]:
"""Search for similar documents."""
query_embedding = self.embedding_fn([query])
query_embedding = self._normalize(query_embedding).astype('float32')
scores, indices = self.index.search(query_embedding, top_k)
results = []
for idx, score in zip(indices[0], scores[0]):
if idx >= 0: # FAISS returns -1 for missing results
results.append((int(idx), float(score), self.documents[idx]))
return results
def benchmark(self, queries: List[str], top_k: int = 5) -> dict:
"""Benchmark search performance."""
times = []
for query in queries:
start = time.time()
self.search(query, top_k)
times.append(time.time() - start)
return {
"index_type": self.index_type,
"num_vectors": self.index.ntotal,
"avg_latency_ms": np.mean(times) * 1000,
"p99_latency_ms": np.percentile(times, 99) * 1000,
"queries_per_second": len(queries) / sum(times)
}
# Provider-agnostic embedding function
def get_embedding_fn(provider: str = "local"):
"""Get embedding function for specified provider."""
if provider == "local":
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
return lambda texts: model.encode(texts)
elif provider == "openai":
from openai import OpenAI
client = OpenAI()
def embed(texts):
response = client.embeddings.create(
model="text-embedding-3-small",
input=texts
)
return np.array([e.embedding for e in response.data])
return embed
elif provider == "cohere":
import cohere
client = cohere.Client()
def embed(texts):
response = client.embed(
texts=texts,
model="embed-english-v3.0",
input_type="search_document"
)
return np.array(response.embeddings)
return embed
raise ValueError(f"Unknown provider: {provider}")
# Example usage
if __name__ == "__main__":
documents = [
"Machine learning models detect fraudulent transactions in real-time",
"Deep neural networks process natural language for chatbots",
"Fraud prevention systems use AI to identify suspicious patterns",
"Natural language processing enables semantic search capabilities",
"Financial crime detection leverages machine learning algorithms",
"Computer vision analyzes images for object detection",
"Recommendation systems personalize user experiences",
"Time series forecasting predicts future trends",
]
try:
embed_fn = get_embedding_fn("local")
retriever = FAISSRetriever(
embedding_fn=embed_fn,
index_type="flat",
dimension=384 # all-MiniLM-L6-v2 dimension
)
retriever.fit(documents)
# Semantic query—note different wording
query = "AI for catching financial criminals"
results = retriever.search(query, top_k=3)
print(f"Query: '{query}'\n")
print("Dense Retrieval Results:")
for idx, score, doc in results:
print(f" [{score:.3f}] {doc}")
except ImportError as e:
print(f"Install requirements: pip install faiss-cpu sentence-transformers")
HNSW Deep Dive: Understanding the Algorithm
"""
HNSW (Hierarchical Navigable Small World) explained:
Structure:
- Multiple layers of graphs, each sparser than the one below
- Top layer: very few nodes, long-range connections
- Bottom layer: all nodes, short-range connections
Search process:
1. Start at top layer entry point
2. Greedily move to nearest neighbor
3. When stuck, descend to next layer
4. Continue until bottom layer
5. Return k nearest found
Why it works:
- Top layers provide "highways" for fast traversal
- Bottom layers provide precision
- Like searching for a house: highway → city → street → address
Key parameters:
- M: connections per node (16-64). Higher = better recall, more memory
- efConstruction: build quality (100-500). Higher = better graph, slower build
- efSearch: search quality (50-200). Higher = better recall, slower search
"""
def compare_faiss_indexes(documents: List[str], queries: List[str], embed_fn):
"""Compare different FAISS index types."""
configs = [
{"index_type": "flat", "name": "Flat (Exact)"},
{"index_type": "hnsw", "name": "HNSW (Graph)"},
{"index_type": "ivf", "name": "IVF (Clustered)", "nlist": 10},
]
results = []
for config in configs:
retriever = FAISSRetriever(
embedding_fn=embed_fn,
dimension=384,
**{k: v for k, v in config.items() if k != "name"}
)
retriever.fit(documents)
benchmark = retriever.benchmark(queries, top_k=5)
benchmark["name"] = config["name"]
results.append(benchmark)
print("\n=== Index Comparison ===")
print(f"{'Index':<20} {'Latency (ms)':<15} {'QPS':<10}")
print("-" * 45)
for r in results:
print(f"{r['name']:<20} {r['avg_latency_ms']:<15.2f} {r['queries_per_second']:<10.1f}")
return results
Hybrid Search: Best of Both Worlds
Hybrid search combines sparse (keyword) and dense (semantic) retrieval, capturing both exact matches and semantic similarity.
from typing import List, Tuple, Dict
from dataclasses import dataclass
from enum import Enum
class FusionMethod(Enum):
RRF = "reciprocal_rank_fusion"
WEIGHTED = "weighted_sum"
RELATIVE = "relative_score_fusion"
@dataclass
class HybridResult:
doc_idx: int
document: str
sparse_score: float
dense_score: float
combined_score: float
sparse_rank: int
dense_rank: int
class HybridRetriever:
"""
Hybrid retrieval combining BM25 and dense embeddings.
Why hybrid works:
1. BM25 excels at exact keyword matching
2. Dense excels at semantic similarity
3. Combined catches what each misses alone
Real-world improvement: 15-30% better recall than either alone
"""
def __init__(
self,
embedding_fn,
fusion_method: FusionMethod = FusionMethod.RRF,
sparse_weight: float = 0.5,
rrf_k: int = 60
):
"""
Args:
embedding_fn: Text to vector function
fusion_method: How to combine scores
sparse_weight: Weight for sparse scores (dense = 1 - sparse)
rrf_k: RRF smoothing constant (60 is standard)
"""
self.sparse = BM25Retriever()
self.dense = FAISSRetriever(
embedding_fn=embedding_fn,
index_type="flat",
dimension=384
)
self.fusion_method = fusion_method
self.sparse_weight = sparse_weight
self.dense_weight = 1 - sparse_weight
self.rrf_k = rrf_k
self.documents: List[str] = []
def fit(self, documents: List[str]):
"""Index documents in both retrievers."""
self.documents = documents
self.sparse.fit(documents)
self.dense.fit(documents)
return self
def _reciprocal_rank_fusion(
self,
sparse_results: List[Tuple],
dense_results: List[Tuple],
top_k: int
) -> List[HybridResult]:
"""
RRF: Score = sum(1 / (k + rank)) across all rankings
Benefits:
- Rank-based, so different score scales don't matter
- Robust to outliers
- No tuning needed (k=60 works universally)
"""
scores: Dict[int, Dict] = {}
# Process sparse results
for rank, (idx, score, doc) in enumerate(sparse_results):
if idx not in scores:
scores[idx] = {"sparse_score": 0, "dense_score": 0,
"sparse_rank": -1, "dense_rank": -1}
scores[idx]["sparse_score"] = score
scores[idx]["sparse_rank"] = rank + 1
# Process dense results
for rank, (idx, score, doc) in enumerate(dense_results):
if idx not in scores:
scores[idx] = {"sparse_score": 0, "dense_score": 0,
"sparse_rank": -1, "dense_rank": -1}
scores[idx]["dense_score"] = score
scores[idx]["dense_rank"] = rank + 1
# Compute RRF scores
results = []
for idx, data in scores.items():
rrf_score = 0
if data["sparse_rank"] > 0:
rrf_score += 1 / (self.rrf_k + data["sparse_rank"])
if data["dense_rank"] > 0:
rrf_score += 1 / (self.rrf_k + data["dense_rank"])
results.append(HybridResult(
doc_idx=idx,
document=self.documents[idx],
sparse_score=data["sparse_score"],
dense_score=data["dense_score"],
combined_score=rrf_score,
sparse_rank=data["sparse_rank"],
dense_rank=data["dense_rank"]
))
results.sort(key=lambda x: x.combined_score, reverse=True)
return results[:top_k]
def _weighted_fusion(
self,
sparse_results: List[Tuple],
dense_results: List[Tuple],
top_k: int
) -> List[HybridResult]:
"""
Weighted sum of normalized scores.
Challenge: Different score scales (BM25: 0-20+, cosine: 0-1)
Solution: Min-max normalization before combining
"""
# Collect all scores for normalization
sparse_scores = {idx: score for idx, score, _ in sparse_results}
dense_scores = {idx: score for idx, score, _ in dense_results}
# Normalize to 0-1
def normalize(scores: Dict) -> Dict:
if not scores:
return {}
min_s, max_s = min(scores.values()), max(scores.values())
if max_s == min_s:
return {k: 1.0 for k in scores}
return {k: (v - min_s) / (max_s - min_s) for k, v in scores.items()}
sparse_norm = normalize(sparse_scores)
dense_norm = normalize(dense_scores)
# Combine
all_indices = set(sparse_norm.keys()) | set(dense_norm.keys())
results = []
for idx in all_indices:
sparse_s = sparse_norm.get(idx, 0)
dense_s = dense_norm.get(idx, 0)
combined = self.sparse_weight * sparse_s + self.dense_weight * dense_s
results.append(HybridResult(
doc_idx=idx,
document=self.documents[idx],
sparse_score=sparse_scores.get(idx, 0),
dense_score=dense_scores.get(idx, 0),
combined_score=combined,
sparse_rank=-1,
dense_rank=-1
))
results.sort(key=lambda x: x.combined_score, reverse=True)
return results[:top_k]
def search(self, query: str, top_k: int = 5, retrieval_k: int = 20) -> List[HybridResult]:
"""
Hybrid search combining sparse and dense results.
Args:
query: Search query
top_k: Final number of results
retrieval_k: How many to retrieve from each method before fusion
"""
sparse_results = self.sparse.search(query, top_k=retrieval_k)
dense_results = self.dense.search(query, top_k=retrieval_k)
if self.fusion_method == FusionMethod.RRF:
return self._reciprocal_rank_fusion(sparse_results, dense_results, top_k)
else:
return self._weighted_fusion(sparse_results, dense_results, top_k)
# Demonstration
if __name__ == "__main__":
documents = [
"Machine learning models detect fraudulent transactions in real-time",
"Deep neural networks process natural language for chatbots",
"Fraud prevention systems use AI to identify suspicious patterns",
"Natural language processing enables semantic search capabilities",
"Financial crime detection leverages machine learning algorithms",
"The quarterly earnings report exceeded analyst expectations",
"Database indexing improves query performance significantly",
"Cloud computing enables scalable infrastructure deployment",
]
try:
embed_fn = get_embedding_fn("local")
hybrid = HybridRetriever(
embedding_fn=embed_fn,
fusion_method=FusionMethod.RRF
)
hybrid.fit(documents)
# Query with both keyword and semantic aspects
query = "ML fraud detection"
print(f"Query: '{query}'\n")
# Compare all three methods
sparse_only = hybrid.sparse.search(query, top_k=3)
dense_only = hybrid.dense.search(query, top_k=3)
hybrid_results = hybrid.search(query, top_k=3)
print("BM25 (Sparse) Results:")
for idx, score, doc in sparse_only:
print(f" [{score:.3f}] {doc[:50]}...")
print("\nDense Results:")
for idx, score, doc in dense_only:
print(f" [{score:.3f}] {doc[:50]}...")
print("\nHybrid (RRF) Results:")
for r in hybrid_results:
print(f" [{r.combined_score:.4f}] {r.document[:50]}...")
print(f" Sparse rank: {r.sparse_rank}, Dense rank: {r.dense_rank}")
except ImportError:
print("Install: pip install faiss-cpu sentence-transformers")
When to Use What: Decision Framework
def recommend_retrieval_strategy(
corpus_size: int,
query_type: str,
latency_requirement_ms: float,
accuracy_priority: bool
) -> dict:
"""
Recommend retrieval strategy based on requirements.
Args:
corpus_size: Number of documents
query_type: "keyword", "semantic", "mixed"
latency_requirement_ms: Maximum acceptable latency
accuracy_priority: Prioritize recall over speed
"""
# Small corpus: exact search is fast enough
if corpus_size < 10000:
if query_type == "keyword":
return {"strategy": "BM25", "index": "in-memory",
"reason": "Small corpus, keyword queries—BM25 is fast and effective"}
elif query_type == "semantic":
return {"strategy": "Dense", "index": "FAISS Flat",
"reason": "Small corpus—exact search is fast enough"}
else:
return {"strategy": "Hybrid", "index": "FAISS Flat + BM25",
"reason": "Mixed queries benefit from both methods"}
# Medium corpus: need approximate search
if corpus_size < 1000000:
if latency_requirement_ms < 10:
return {"strategy": "Dense", "index": "FAISS HNSW",
"reason": "Sub-10ms requires HNSW graph index"}
if accuracy_priority:
return {"strategy": "Hybrid", "index": "FAISS HNSW + BM25",
"reason": "Accuracy priority—hybrid maximizes recall"}
return {"strategy": "Dense", "index": "FAISS IVF",
"reason": "IVF balances speed and accuracy for medium corpus"}
# Large corpus: compression needed
return {"strategy": "Hybrid", "index": "FAISS IVF-PQ + BM25",
"reason": "Large corpus needs compressed index; hybrid maintains quality"}
# Print decision matrix
if __name__ == "__main__":
scenarios = [
(5000, "keyword", 50, False),
(5000, "semantic", 50, False),
(50000, "mixed", 20, True),
(500000, "semantic", 10, False),
(5000000, "mixed", 50, True),
]
print("=== Retrieval Strategy Decision Matrix ===\n")
print(f"{'Corpus':<12} {'Query Type':<12} {'Latency':<10} {'Accuracy':<10} {'Recommendation'}")
print("-" * 75)
for size, qtype, latency, accuracy in scenarios:
rec = recommend_retrieval_strategy(size, qtype, latency, accuracy)
print(f"{size:<12} {qtype:<12} {latency}ms{'':<6} {str(accuracy):<10} {rec['strategy']}")
Data Engineer's ROI Lens: The Business Impact
def analyze_retrieval_roi(
daily_queries: int,
corpus_size: int,
embedding_price: float = 0.02, # per 1M tokens
infrastructure_cost_per_gb: float = 0.10, # monthly
engineer_hourly_rate: float = 100
) -> dict:
"""
Calculate ROI impact of retrieval strategy choices.
"""
# Baseline: Dense-only with flat index
dense_embedding_cost = (corpus_size * 500 / 1_000_000) * embedding_price # ~500 tokens/doc
dense_storage_gb = corpus_size * 384 * 4 / (1024**3) # 384-dim float32
# BM25: No embedding cost, minimal storage
sparse_storage_gb = corpus_size * 0.001 # ~1KB per doc for inverted index
# Hybrid: Both costs
hybrid_cost = dense_embedding_cost
hybrid_storage = dense_storage_gb + sparse_storage_gb
# Quality impact (industry benchmarks)
quality_multipliers = {
"sparse_only": 0.70,
"dense_only": 0.85,
"hybrid": 0.95
}
# Support cost reduction (better retrieval = fewer escalations)
monthly_queries = daily_queries * 30
base_escalation_rate = 0.05 # 5% of queries need human help
escalation_cost = 25 # $ per escalation
results = {}
for strategy, quality in quality_multipliers.items():
if strategy == "sparse_only":
monthly_cost = sparse_storage_gb * infrastructure_cost_per_gb
setup_hours = 4
elif strategy == "dense_only":
monthly_cost = dense_embedding_cost + (dense_storage_gb * infrastructure_cost_per_gb)
setup_hours = 8
else:
monthly_cost = hybrid_cost + (hybrid_storage * infrastructure_cost_per_gb)
setup_hours = 16
escalation_reduction = (quality - 0.5) * base_escalation_rate * monthly_queries
monthly_savings = escalation_reduction * escalation_cost
results[strategy] = {
"monthly_infra_cost": round(monthly_cost, 2),
"quality_score": quality,
"monthly_escalation_savings": round(monthly_savings, 2),
"net_monthly_benefit": round(monthly_savings - monthly_cost, 2),
"setup_hours": setup_hours,
"setup_cost": setup_hours * engineer_hourly_rate
}
return results
if __name__ == "__main__":
roi = analyze_retrieval_roi(
daily_queries=1000,
corpus_size=100000
)
print("=== Monthly ROI by Strategy (100K docs, 1K queries/day) ===\n")
print(f"{'Strategy':<15} {'Infra Cost':<12} {'Quality':<10} {'Savings':<12} {'Net Benefit'}")
print("-" * 60)
for strategy, data in sorted(roi.items(), key=lambda x: -x[1]['net_monthly_benefit']):
print(f"{strategy:<15} ${data['monthly_infra_cost']:<11} "
f"{data['quality_score']:<10.0%} ${data['monthly_escalation_savings']:<11} "
f"${data['net_monthly_benefit']}")
print("\n=== Key Insight ===")
hybrid = roi['hybrid']
dense = roi['dense_only']
improvement = hybrid['net_monthly_benefit'] - dense['net_monthly_benefit']
print(f"Hybrid delivers ${improvement:.0f}/month more value than dense-only")
print(f"Annual advantage: ${improvement * 12:,.0f}")
Sample Output:
Strategy Infra Cost Quality Savings Net Benefit
------------------------------------------------------------
hybrid $1.08 95% $6750.0 $6748.92
dense_only $1.03 85% $5250.0 $5248.97
sparse_only $0.01 70% $3000.0 $2999.99
=== Key Insight ===
Hybrid delivers $1500/month more value than dense-only
Annual advantage: $18,000
Key Takeaways
BM25 for exact matching, dense for semantic similarity—neither alone is complete
Hybrid search improves recall 15-30% over single methods with minimal added complexity
HNSW is the default choice for dense retrieval—best recall/speed trade-off
RRF fusion requires no tuning—use k=60 and it just works across score scales
Scale determines index choice: Flat (<100K), HNSW (100K-10M), IVF-PQ (10M+)
The ROI math is clear: Hybrid's higher quality translates directly to fewer support escalations and better user satisfaction
Start with hybrid (BM25 + HNSW), measure your recall, and only simplify if latency requirements demand it.
Top comments (0)