RAG chatbots fail silently.
Imagine you've hired a chatbot to handle FAQs on your behalf, so you can level up your D&D character instead of replying to the same questions over and over. You provided all the information you could think of.
Then your potential biggest client asks if you're left or right handed. The chatbot doesn't know. The client walks away. It never occurred to you to teach it. Who asks that? You will never know. Because you had no idea there was a missed question in the first place.
In this tutorial, we're building an MCP server that creates that feedback loop. When the chatbot gives a low-confidence answer, it logs the question. Over time, questions pile up. Run the pattern analysis and it clusters them by semantic similarity — "oh, 40 people asked about refund policies in different ways, and we have zero documentation on that." Then it suggests what to write, and you close the loop.
We'll build it step by step. By the end, you'll have a working MCP server with four tools:
| Tool | What it does |
|---|---|
log_unanswered_question |
Store a question + its embedding |
get_question_patterns |
Cluster unresolved questions, return topics |
suggest_documents |
AI-generated doc outline for a topic |
mark_resolved |
Close the loop after adding documentation |
Let's start with the simplest possible version and layer on complexity as we go.
Repo: github.com/hamurda/rag-insights-mcp
Setup
mkdir unanswered-questions-mcp && cd unanswered-questions-mcp
uv init && uv add mcp[cli] openai numpy scikit-learn aiosqlite pydantic python-dotenv
Create a .env file:
OPENAI_API_KEY=sk-your-key-here
And a config.py to load it:
import os
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
class Config:
DATABASE_PATH: str = os.getenv("DATABASE_PATH", "unanswered_questions.db")
OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
EMBEDDING_MODEL: str = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small")
CHAT_MODEL: str = os.getenv("CHAT_MODEL", "gpt-5-mini")
SIMILARITY_THRESHOLD: float = float(os.getenv("SIMILARITY_THRESHOLD", "0.50"))
MIN_CLUSTER_SIZE: int = int(os.getenv("MIN_CLUSTER_SIZE", "3"))
MAX_CLUSTER_SIZE: int = int(os.getenv("MAX_CLUSTER_SIZE", "20"))
EMBEDDING_BATCH_SIZE: int = int(os.getenv("EMBEDDING_BATCH_SIZE", "100"))
@classmethod
def validate(cls) -> None:
if not cls.OPENAI_API_KEY:
raise ValueError("OPENAI_API_KEY is required. Set it in .env or environment.")
if not 0 <= cls.SIMILARITY_THRESHOLD <= 1:
raise ValueError(f"SIMILARITY_THRESHOLD must be 0-1, got {cls.SIMILARITY_THRESHOLD}")
@classmethod
def db_path(cls) -> Path:
return Path(__file__).parent / cls.DATABASE_PATH
Everything is configurable via environment variables, but the defaults are fine for now. That SIMILARITY_THRESHOLD of 0.50 looks low, and there's a story behind it.
Step 1: Store a Question
First things first. We need a place to put questions. Let's start with the database.
Because we want to save the questions and group the common ones, we need two tables: questions for individual logged questions, and clusters for the patterns we'll discover later.
And because this MCP Server is built for a multi-tenant chatbot, every table needs a tenant_id column. If you're running a single chatbot, leave it None, but if you're building for multiple customers, tenant scoping is baked in from the start. Much easier than retrofitting.
# database.py
import json
import uuid
import pickle
from datetime import datetime, timezone
from typing import Optional
import aiosqlite
from config import Config
SCHEMA = """
CREATE TABLE IF NOT EXISTS questions (
id TEXT PRIMARY KEY,
question TEXT NOT NULL,
context TEXT,
tenant_id TEXT,
metadata TEXT,
embedding BLOB,
created_at TEXT NOT NULL,
resolved INTEGER DEFAULT 0,
resolved_at TEXT,
resolved_by TEXT,
resolution_notes TEXT
);
CREATE TABLE IF NOT EXISTS clusters (
id TEXT PRIMARY KEY,
tenant_id TEXT,
topic TEXT,
question_ids TEXT,
representative_questions TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_questions_tenant ON questions(tenant_id);
CREATE INDEX IF NOT EXISTS idx_questions_resolved ON questions(resolved);
CREATE INDEX IF NOT EXISTS idx_questions_created ON questions(created_at);
"""
Why SQLite? Because this isn't real-time retrieval. Nobody is searching the vector space at query time — we're doing batch analysis on a schedule. SQLite is zero configuration, single file, and handles 10K+ questions without breaking a sweat.
Now the Database class. We'll start with just enough to save and retrieve questions:
class Database:
def __init__(self, db_path: Optional[str] = None):
self.db_path = db_path or str(Config.db_path())
self._conn: Optional[aiosqlite.Connection] = None
async def connect(self) -> None:
self._conn = await aiosqlite.connect(self.db_path)
await self._conn.executescript(SCHEMA)
async def close(self) -> None:
if self._conn:
await self._conn.close()
async def save_question(
self, question: str, context: str | None = None,
tenant_id: str | None = None, metadata: dict | None = None,
embedding: list[float] | None = None,
) -> str:
qid = str(uuid.uuid4())
await self._conn.execute(
"""INSERT INTO questions
(id, question, context, tenant_id, metadata, embedding, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
qid, question, context, tenant_id,
json.dumps(metadata) if metadata else None,
pickle.dumps(embedding) if embedding else None,
datetime.now(timezone.utc).isoformat(),
),
)
await self._conn.commit()
return qid
async def get_unresolved(
self, tenant_id: str | None = None, limit: int | None = None
) -> list[dict]:
query = "SELECT * FROM questions WHERE resolved = 0"
params: list = []
if tenant_id:
query += " AND tenant_id = ?"
params.append(tenant_id)
query += " ORDER BY created_at DESC"
if limit:
query += " LIMIT ?"
params.append(limit)
cursor = await self._conn.execute(query, params)
return [self._to_dict(row) for row in await cursor.fetchall()]
async def get_question(self, question_id: str) -> dict | None:
cursor = await self._conn.execute(
"SELECT * FROM questions WHERE id = ?", (question_id,)
)
row = await cursor.fetchone()
return self._to_dict(row) if row else None
@staticmethod
def _to_dict(row) -> dict:
return {
"id": row[0], "question": row[1], "context": row[2],
"tenant_id": row[3],
"metadata": json.loads(row[4]) if row[4] else None,
"embedding": pickle.loads(row[5]) if row[5] else None,
"created_at": row[6], "resolved": bool(row[7]),
"resolved_at": row[8], "resolved_by": row[9],
"resolution_notes": row[10],
}
Embeddings are stored as pickled BLOBs. That feels wrong, I know. We never query by embedding. There's no WHERE embedding NEAR ... happening here. We load all unresolved questions into memory and compute pairwise similarities. Pickle is the simplest way to get a list[float] in and out of SQLite, and for this access pattern, it earns its place.
Let's verify it works. Create a quick test script:
# test_step1.py
import asyncio
from database import Database
async def main():
db = Database("test.db")
await db.connect()
qid = await db.save_question(
question="How do I reset my password?",
context="Login page",
tenant_id="acme",
)
print(f"Saved: {qid}")
questions = await db.get_unresolved(tenant_id="acme")
print(f"Retrieved: {questions[0]['question']}")
await db.close()
asyncio.run(main())
$ python test_step1.py
Saved: a1b2c3d4-...
Retrieved: How do I reset my password?
Questions go in, questions come out. Now let's make them searchable.
Step 2: Add Embeddings
A question stored as text is just a string. To us, "how do I reset my password" and "I forgot my password, how do I change it" are obviously the same topic. To a database, they're two completely different rows. We need to turn them into vectors to find the pattern.
Create analyzer.py. We'll start with just the embedding logic:
# analyzer.py
import openai
from config import Config
from database import Database
class QuestionAnalyzer:
def __init__(self, database: Database):
self.db = database
self.client = openai.AsyncOpenAI(api_key=Config.OPENAI_API_KEY)
async def generate_embedding(self, text: str) -> list[float]:
response = await self.client.embeddings.create(
model=Config.EMBEDDING_MODEL, input=text
)
return response.data[0].embedding
Every question gets embedded when it's logged. But what about questions that were logged without embeddings? Maybe the API was down, maybe they came from a bulk import.
For that, we add a batch embedding method and a safety net that checks for gaps before analysis. The batching matters; OpenAI's embedding API accepts multiple inputs per call, so we chunk at 100 instead of making one API call per question.
async def generate_embeddings_batch(self, texts: list[str]) -> list[list[float]]:
if not texts:
return []
embeddings: list[list[float]] = []
for i in range(0, len(texts), Config.EMBEDDING_BATCH_SIZE):
batch = texts[i : i + Config.EMBEDDING_BATCH_SIZE]
response = await self.client.embeddings.create(
model=Config.EMBEDDING_MODEL, input=batch
)
embeddings.extend(item.embedding for item in response.data)
return embeddings
async def ensure_embeddings(self, questions: list[dict]) -> list[dict]:
needs = [q for q in questions if q["embedding"] is None]
if not needs:
return questions
texts = [q["question"] for q in needs]
embeddings = await self.generate_embeddings_batch(texts)
for q, emb in zip(needs, embeddings):
q["embedding"] = emb
await self.db.update_embedding(q["id"], emb)
return questions
This checks every question before analysis and fills in anything that's missing. We'll also need update_embedding on the database, add this to database.py:
async def update_embedding(self, question_id: str, embedding: list[float]) -> bool:
cursor = await self._conn.execute(
"UPDATE questions SET embedding = ? WHERE id = ?",
(pickle.dumps(embedding), question_id),
)
await self._conn.commit()
return cursor.rowcount > 0
Now when we log a question, we can generate the embedding at the same time:
# test_step2.py
import asyncio
from database import Database
from analyzer import QuestionAnalyzer
async def main():
db = Database("test.db")
await db.connect()
analyzer = QuestionAnalyzer(db)
questions = [
"How do I reset my password?",
"I forgot my password, how do I change it?",
"Where can I find my invoice?",
]
for q in questions:
embedding = await analyzer.generate_embedding(q)
qid = await db.save_question(question=q, embedding=embedding, tenant_id="acme")
print(f"Saved '{q[:40]}' with {len(embedding)}-dim embedding")
await db.close()
asyncio.run(main())
$ python test_step2.py
Saved 'How do I reset my password?' with 1536-dim embedding
Saved 'I forgot my password, how do I change it' with 1536-dim embedding
Saved 'Where can I find my invoice?' with 1536-dim embedding
Each question is now a 1536-dimensional vector. Time to compare them.
Step 3: Cluster by Similarity
This is the core of the analysis. We have questions as vectors — now we need to find which ones are about the same topic.
The approach: compute pairwise cosine similarities, then greedily assign questions to clusters. Add this to QuestionAnalyzer:
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
def cluster_questions(
self, questions: list[dict], threshold: float | None = None
) -> list[list[dict]]:
if not questions:
return []
threshold = threshold or Config.SIMILARITY_THRESHOLD
embeddings = np.array([q["embedding"] for q in questions])
sim = cosine_similarity(embeddings)
clusters: list[list[dict]] = []
assigned: set[int] = set()
for i, q in enumerate(questions):
if i in assigned:
continue
cluster = [q]
assigned.add(i)
for j in range(len(questions)):
if j not in assigned and sim[i][j] >= threshold:
cluster.append(questions[j])
assigned.add(j)
if len(cluster) >= Config.MIN_CLUSTER_SIZE:
clusters.append(cluster[: Config.MAX_CLUSTER_SIZE])
return clusters
Walk through questions in order. For each unassigned question, start a new cluster. Then scan every other unassigned question. If its cosine similarity to the first one exceeds the threshold, pull it in. Once you've scanned everything, move to the next unassigned question and start a new cluster. If that doesn't click on the first read, tell it to your rubber duck.
Let's test it with real data. Here are 14 questions: three obvious clusters and two singletons:
# test_step3.py
import asyncio
from database import Database
from analyzer import QuestionAnalyzer
SAMPLE_QUESTIONS = [
# Password reset cluster
{"question": "How do I reset my password?", "tenant_id": "acme"},
{"question": "I need to reset my password", "tenant_id": "acme"},
{"question": "I forgot my password, how do I change it?", "tenant_id": "acme"},
{"question": "My password isn't working, how do I reset it?", "tenant_id": "acme"},
# Billing cluster
{"question": "Where can I find my invoice?", "tenant_id": "acme"},
{"question": "How do I download my invoice?", "tenant_id": "acme"},
{"question": "I need a copy of my invoice", "tenant_id": "acme"},
{"question": "Can I get a receipt for my payment?", "tenant_id": "acme"},
# API rate limits cluster
{"question": "What are the API rate limits?", "tenant_id": "techcorp"},
{"question": "How many API requests can I make per minute?", "tenant_id": "techcorp"},
{"question": "I'm hitting the API rate limit, what's the cap?", "tenant_id": "techcorp"},
{"question": "Is there a limit on API calls?", "tenant_id": "techcorp"},
# Singletons — should NOT cluster
{"question": "Do you support single sign-on?", "tenant_id": "techcorp"},
{"question": "What colors can I use for my dashboard theme?", "tenant_id": "acme"},
]
async def main():
db = Database("test_clustering.db")
await db.connect()
analyzer = QuestionAnalyzer(db)
# Log all questions with embeddings
for q in SAMPLE_QUESTIONS:
emb = await analyzer.generate_embedding(q["question"])
await db.save_question(question=q["question"], tenant_id=q["tenant_id"], embedding=emb)
# Retrieve and cluster
questions = await db.get_unresolved()
clusters = analyzer.cluster_questions(questions)
print(f"Found {len(clusters)} cluster(s):\n")
for i, cluster in enumerate(clusters):
print(f"Cluster {i + 1} ({len(cluster)} questions):")
for q in cluster:
print(f" - {q['question']}")
print()
await db.close()
asyncio.run(main())
Run it and you should see three clusters. The password questions find each other. The billing questions find each other. The API rate limit questions find each other. The SSO and dashboard theme questions sit alone and get dropped because they don't meet MIN_CLUSTER_SIZE of 3.
About that threshold
That SIMILARITY_THRESHOLD of 0.50 looks low. I started at 0.70, thinking that's a reasonable bar for "these questions are about the same thing."
It wasn't. The problem is the greedy algorithm: the first question in each cluster becomes the anchor, and every other question is measured against it. Not against the cluster centroid, not against the closest member. So if the anchor is "How do I reset my password?" and another question is "What's your policy on account recovery?", text-embedding-3-small might score that at 0.55. Same topic to a human. Below threshold at 0.70.
Dropping to 0.50 fixed it. The safety net is MIN_CLUSTER_SIZE. A loose threshold won't generate noise because stray pairs get dropped. You need at least 3 questions about the same thing before it's reported as a pattern.
Could I have used HDBSCAN instead? It doesn't depend on anchor order and it's more robust. But it felt like bringing a machine learning framework to a problem I could solve with 30 lines of code and a lower threshold. If the clusters start looking noisy with real production data, HDBSCAN is the upgrade path.
Step 4: Complete the analysis pipeline
We can cluster. Now we need to turn clusters into something useful.
First, a way to pick the most representative questions from a cluster, the ones most similar to everything else:
# Add to QuestionAnalyzer in analyzer.py
def pick_representative(self, cluster: list[dict], n: int = 3) -> list[str]:
if len(cluster) <= n:
return [q["question"] for q in cluster]
embeddings = np.array([q["embedding"] for q in cluster])
sim = cosine_similarity(embeddings)
centrality = [
(np.mean([sim[i][j] for j in range(len(cluster)) if j != i]), cluster[i])
for i in range(len(cluster))
]
centrality.sort(key=lambda x: x[0], reverse=True)
return [q["question"] for _, q in centrality[:n]]
Next, topic labeling. This is where things get architecturally interesting. The MCP tool itself makes an LLM call:
async def extract_topic(self, cluster: list[dict]) -> str:
sample = [q["question"] for q in cluster[:10]]
prompt = (
"These are similar customer questions. "
"Reply with ONLY a 2-5 word topic label.\n\n"
+ "\n".join(f"- {q}" for q in sample)
)
resp = await self.client.chat.completions.create(
model=Config.CHAT_MODEL,
messages=[{"role": "user", "content": prompt}],
)
return resp.choices[0].message.content.strip()
An LLM tool that calls another LLM. I have thoughts on this but that's a separate post. For now, it works, and the alternative (hand-labeling clusters) doesn't scale.
Now the orchestration method that ties it all together — fetch unresolved questions, ensure embeddings, cluster, and label:
async def find_patterns(
self, tenant_id: str | None = None, min_cluster_size: int | None = None,
) -> list[dict]:
questions = await self.db.get_unresolved(tenant_id=tenant_id)
if not questions:
return []
questions = await self.ensure_embeddings(questions)
clusters = self.cluster_questions(questions)
if min_cluster_size:
clusters = [c for c in clusters if len(c) >= min_cluster_size]
patterns: list[dict] = []
for cluster in clusters:
topic = await self.extract_topic(cluster)
representative = self.pick_representative(cluster)
qids = [q["id"] for q in cluster]
patterns.append({
"topic": topic,
"count": len(cluster),
"representative_questions": representative,
"question_ids": qids,
"earliest_date": min(q["created_at"] for q in cluster),
"latest_date": max(q["created_at"] for q in cluster),
})
await self.db.save_cluster(
topic=topic, question_ids=qids,
representative_questions=representative, tenant_id=tenant_id,
)
patterns.sort(key=lambda p: p["count"], reverse=True)
return patterns
We also need save_cluster, mark_resolved and get_stats on the database, plus suggest_documentation on the analyzer that powers the suggest_documents tool. These are straightforward: full source on GitHub. Here's mark_resolved since it completes the feedback loop:
# Add to Database in database.py
async def mark_resolved(
self, question_id: str, resolved_by: str, notes: str | None = None
) -> bool:
cursor = await self._conn.execute(
"""UPDATE questions
SET resolved = 1, resolved_at = ?, resolved_by = ?, resolution_notes = ?
WHERE id = ?""",
(datetime.now(timezone.utc).isoformat(), resolved_by, notes, question_id),
)
await self._conn.commit()
return cursor.rowcount > 0
Step 5: Wrap It in MCP
We have a working system: questions go in with embeddings, clustering finds patterns. Now let's expose it as an MCP server so any MCP client (Claude Desktop, an agent, a scheduled script) can use it.
Now server.py. FastMCP gives us a clean way to define tools with typed inputs:
# server.py
import json
from contextlib import asynccontextmanager
from typing import Optional
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
from config import Config
from database import Database
from analyzer import QuestionAnalyzer
@asynccontextmanager
async def lifespan(server: FastMCP):
Config.validate()
db = Database()
await db.connect()
analyzer = QuestionAnalyzer(db)
yield {"db": db, "analyzer": analyzer}
await db.close()
mcp = FastMCP("unanswered_questions_mcp", lifespan=lifespan)
The lifespan context manager creates one database connection and one analyzer for the server's lifetime. Every tool handler gets them from ctx.request_context.lifespan_state. No global singletons, clean shutdown.
Each tool gets a Pydantic input model. These descriptions aren't just for docs. They're what the MCP client sees when deciding how to call the tool:
class LogQuestionInput(BaseModel):
question: str = Field(..., description="The question the chatbot couldn't answer")
context: Optional[str] = Field(None, description="Where/why this question was asked")
tenant_id: Optional[str] = Field(None, description="Tenant identifier for multi-tenant systems")
metadata: Optional[dict] = Field(None, description="Extra metadata (user_id, confidence, source, etc.)")
And the tool itself:
@mcp.tool(
name="log_unanswered_question",
annotations={
"title": "Log Unanswered Question",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": True,
},
)
async def log_unanswered_question(params: LogQuestionInput, ctx=None) -> str:
db: Database = ctx.request_context.lifespan_state["db"]
analyzer: QuestionAnalyzer = ctx.request_context.lifespan_state["analyzer"]
embedding = await analyzer.generate_embedding(params.question)
qid = await db.save_question(
question=params.question,
context=params.context,
tenant_id=params.tenant_id,
metadata=params.metadata,
embedding=embedding,
)
return json.dumps({"success": True, "question_id": qid})
The annotations tell the MCP client what kind of operation this is. readOnlyHint: False because it writes data. idempotentHint: False, since calling it twice creates two records. openWorldHint: True, as it calls external services.
The other three tools follow the same pattern: get_question_patterns calls find_patterns and includes stats, suggest_documents generates doc outlines, and mark_resolved closes the loop. Full server source on GitHub.
Start the server:
if __name__ == "__main__":
mcp.run()
Add it to Claude Desktop:
{
"mcpServers": {
"unanswered-questions": {
"command": "python",
"args": ["/path/to/unanswered-questions-mcp/server.py"]
}
}
}
Step 6: The Full Loop
Let's run through the entire lifecycle. The test suite (test_server.py) does this without starting the MCP server — same code paths, no transport layer in the way:
python test_server.py
Here's what happens:
1. Log 14 questions — 4 about password resets, 4 about billing, 4 about API rate limits, 2 singletons. Each gets an embedding at log time.
2. Find patterns — the analyzer loads all unresolved questions, ensures embeddings exist, builds a similarity matrix, and clusters them. You should see 3 patterns. The singletons get dropped.
3. Suggest documentation — pass "password reset" as a topic. The analyzer finds related questions by embedding similarity, then asks the LLM to generate a doc outline: title, sections, what to cover.
4. Mark resolved — mark the first two questions as resolved, verify the resolution rate updates.
5. Edge cases — resolve a nonexistent ID (returns False), run patterns on an empty database (returns []).
The output shows similarity scores between questions as they cluster, so you can see exactly which questions the algorithm considers similar and by how much. This is where you'll develop intuition for whether your threshold is right.
Things to know
The greedy algorithm is order-dependent. Shuffle the input and you might get slightly different clusters. For broad pattern detection — "people keep asking about refunds" — this is fine. For precise categorization, it's not. HDBSCAN would fix it.
text-embedding-3-large would tighten the clusters. Higher-dimensional embeddings produce sharper similarity scores, which means the threshold could go back up toward 0.65–0.70. I stuck with small because it's cheaper and the lower threshold compensates.
No auth. This server relies on the MCP client handling access control. If you're exposing it over HTTP instead of stdio, add authentication.
Wrapping Up
What we built: an MCP server that turns your RAG chatbot's silent failures into actionable insights. Log the questions it can't answer, cluster them by similarity, figure out what docs are missing, write them, close the loop.
The code is simple on purpose. SQLite, greedy clustering, 30 lines of core algorithm. The trade-offs are real, but the feedback loop works — and that's the part that matters.
Repo: github.com/hamurda/rag-insights-mcp
Fork it, plug it in, see what your users are actually asking that you haven't documented.
Have questions or feedback? Find me on LinkedIn or check out my multi-tenant AI chatbot series for the RAG chatbot this plugs into.
Top comments (0)