This is Part 3 of a 6-part series. Part 2 covers the foundation setup.
Claude Orchestrator & Multi-Source Search
This is where Harper Eye becomes genuinely powerful. By the end of this part, you'll have an AI assistant that searches all your internal data sources in parallel, synthesizes the results with Claude, and returns structured, cited responses like this:
About 400 lines of code, and every one of them earns its keep.
The Architecture of a Single Query
When someone asks Harper Eye a question, here's exactly what happens:
The key insight: everything that can run in parallel, does. The embedding generation happens once at the start, then gets reused for KB search, negative feedback search, expertise lookup, and code knowledge search. The six data sources all fire simultaneously. The total wall-clock time is dominated by the slowest source (usually 2-4 seconds) plus Claude's synthesis time, not the sum of all sources.
Here's the actual pipeline timing from a production query, you can see the parallelism in action:
Step 1: Build Your First Data Source Wrapper
Every external data source follows the same pattern: define the tools Claude could use, then implement simple fetch() calls to the source's REST API. No MCP subprocess servers, no complex client libraries, just HTTP.
Let's build the Confluence wrapper as our template. Create mcp/confluence.js:
import { config } from '../lib/config.js';
/**
* Confluence search tool — direct HTTP calls to the REST API.
* Provides tool definitions (Claude format) and execution functions.
*/
const tools = [
{
name: 'search_confluence',
description: 'Full-text search across Confluence spaces for runbooks, postmortems, and architecture docs.',
input_schema: {
type: 'object',
properties: {
query: { type: 'string', description: 'Search query (CQL-compatible)' },
space: { type: 'string', description: 'Optional space key to restrict search' },
limit: { type: 'number', description: 'Max results (default 5)' },
},
required: ['query'],
},
},
{
name: 'get_confluence_page',
description: 'Retrieve the body content of a specific Confluence page by ID.',
input_schema: {
type: 'object',
properties: {
pageId: { type: 'string', description: 'Confluence page ID' },
},
required: ['pageId'],
},
},
];
export function getConfluenceTools() {
return tools;
}
export async function executeConfluenceTool(name, input) {
const baseUrl = config.confluence.baseUrl();
const auth = Buffer.from(
`${config.confluence.email()}:${config.confluence.apiToken()}`
).toString('base64');
const headers = {
Authorization: `Basic ${auth}`,
Accept: 'application/json',
};
switch (name) {
case 'search_confluence':
return searchConfluence(baseUrl, headers, input);
case 'get_confluence_page':
return getPageContent(baseUrl, headers, input);
default:
throw new Error(`Unknown Confluence tool: ${name}`);
}
}
async function searchConfluence(baseUrl, headers, { query, space, limit = 5 }) {
// Extract meaningful keywords for broader matching
const keywords = query
.toLowerCase()
.replace(/[^a-z0-9\s\-_]/g, ' ')
.split(/\s+/)
.filter((w) => w.length >= 3 && !stopWords.has(w));
let cql = keywords.length > 0
? keywords.map((kw) => `text ~ "${kw}"`).join(' AND ')
: `text ~ "${query}"`;
if (space) cql += ` AND space = "${space}"`;
const url = `${baseUrl}/wiki/rest/api/search?cql=${encodeURIComponent(cql)}&limit=${limit}`;
const res = await fetch(url, { headers });
if (!res.ok) throw new Error(`Confluence search failed: ${res.status}`);
const data = await res.json();
return data.results.map((r) => ({
title: r.content?.title ?? r.title,
id: r.content?.id ?? r.id,
url: `${baseUrl}/wiki${r.url ?? r.content?._links?.webui ?? ''}`,
excerpt: stripHtml(r.excerpt ?? ''),
lastModified: r.lastModified,
}));
}
async function getPageContent(baseUrl, headers, { pageId }) {
const url = `${baseUrl}/wiki/rest/api/content/${pageId}?expand=body.storage,version,space`;
const res = await fetch(url, { headers });
if (!res.ok) throw new Error(`Confluence get page failed: ${res.status}`);
const data = await res.json();
const body = stripHtml(data.body?.storage?.value ?? '');
// Truncate to ~2000 tokens worth of text (roughly 8000 chars)
const truncated = body.length > 8000
? body.slice(0, 8000) + '\n\n[Content truncated]'
: body;
return {
title: data.title,
id: data.id,
space: data.space?.key,
url: `${baseUrl}/wiki${data._links?.webui ?? ''}`,
content: truncated,
};
}
const stopWords = new Set([
'the', 'what', 'how', 'when', 'where', 'does', 'this', 'that',
'with', 'for', 'and', 'but', 'not', 'are', 'was', 'can', 'has',
'have', 'from',
]);
function stripHtml(html) {
return html
.replace(/<[^>]*>/g, '')
.replace(/ /g, ' ')
.replace(/&/g, '&')
.replace(/</g, '<')
.replace(/>/g, '>')
.replace(/"/g, '"')
.replace(/\s+/g, ' ')
.trim();
}
The pattern is always the same for every data source:
- Define the tool schema (what Claude sees)
- Export
getXTools()for tool definitions - Export
executeXTool(name, input)as the dispatcher - Implement each tool as a simple
fetch()call - Return clean, structured data
Here are the other wrappers you'll build using this same pattern, I'll show the Zendesk one as another example since each API is slightly different:
Create mcp/zendesk.js:
import { config } from '../lib/config.js';
export async function executeZendeskTool(name, input) {
const subdomain = config.zendesk.subdomain();
const auth = Buffer.from(
`${config.zendesk.email()}/token:${config.zendesk.apiToken()}`
).toString('base64');
const headers = {
Authorization: `Basic ${auth}`,
Accept: 'application/json',
};
if (name === 'search_tickets') {
return searchTickets(subdomain, headers, input);
}
throw new Error(`Unknown Zendesk tool: ${name}`);
}
async function searchTickets(subdomain, headers, { query, limit = 5 }) {
const url = `https://${subdomain}.zendesk.com/api/v2/search.json?query=${encodeURIComponent(query)}&per_page=${limit}`;
const res = await fetch(url, { headers });
if (!res.ok) throw new Error(`Zendesk search failed: ${res.status}`);
const data = await res.json();
return (data.results ?? []).map((t) => ({
id: t.id,
subject: t.subject,
status: t.status,
priority: t.priority,
description: (t.description ?? '').slice(0, 500),
url: `https://${subdomain}.zendesk.com/agent/tickets/${t.id}`,
createdAt: t.created_at,
updatedAt: t.updated_at,
}));
}
You'll follow the same pattern for Datadog (mcp/datadog.js), GitHub (mcp/github.js), and your documentation site (mcp/harper-docs.js). Each one is just fetch() calls to their respective REST APIs, returning clean JSON.
Step 2: Build the Embedding Pipeline
Embeddings are how Harper Eye understands meaning, not just keywords. When someone asks "why is my cluster slow?" and a knowledge base entry is titled "investigating replication latency," vector similarity catches the semantic match that keyword search would miss.
Create lib/embeddings.js:
import { config } from './config.js';
/**
* Generate an embedding vector using Google Gemini.
* Returns an array of 768 floats (cosine-similarity compatible).
*/
export async function generateEmbedding(text) {
const apiKey = config.gemini.apiKey();
const model = config.gemini.embeddingModel();
const url = `https://generativelanguage.googleapis.com/v1beta/models/${model}:embedContent?key=${apiKey}`;
const res = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
model: `models/${model}`,
content: { parts: [{ text }] },
outputDimensionality: 768,
}),
});
if (!res.ok) {
const body = await res.text().catch(() => '');
throw new Error(`Gemini embeddings failed: ${res.status} — ${body}`);
}
const data = await res.json();
return data.embedding.values;
}
That's 25 lines. A single API call. And it gives you 768-dimensional vectors that work beautifully with Harper's HNSW indexes.
We use Gemini for embeddings because it's fast, the free tier is generous (1,500 requests/min), and the text-embedding-004 model produces high-quality vectors. You could swap this for OpenAI's embeddings or any other provider; the orchestrator doesn't care.
Step 3: Build the Orchestrator
This is the heart of Harper Eye. The orchestrator coordinates everything: checks the knowledge base, fetches all data sources in parallel, constructs the Claude prompt, parses the response, and records source relevance for future optimization.
Create lib/orchestrator.js:
import Anthropic from '@anthropic-ai/sdk';
import { config } from './config.js';
import { executeConfluenceTool } from '../mcp/confluence.js';
import { executeZendeskTool } from '../mcp/zendesk.js';
import { executeDatadogTool } from '../mcp/datadog.js';
import { executeGitHubTool } from '../mcp/github.js';
import { executeHarperDocsTool } from '../mcp/harper-docs.js';
import { searchKnowledgeBase, searchNegativeFeedback } from './knowledge-base.js';
import { generateEmbedding } from './embeddings.js';
Now for the system prompt. This is the most important prompt you'll ever write. It defines the personality, the constraints, and the output format of every response your team sees:
const SYSTEM_PROMPT = `You are Harper Eye, the internal assistant for [YOUR COMPANY].
You help engineers resolve incidents and answer questions about our platform.
CONTEXT RULES — CRITICAL:
- ALL answers must be specific to [YOUR COMPANY] and our platform.
Never give generic advice.
- Use our internal terminology (define your key terms here).
- If you cannot find specific information in the provided data,
say so — do not fill in with generic knowledge.
- Do NOT pad with generic steps like "check logs" or "restart the service"
unless you can specify EXACTLY which logs, which service.
You will receive data pre-fetched from these sources:
- DOCUMENTATION: Official product docs (most authoritative)
- CONFLUENCE: Internal runbooks, postmortems, architecture docs
- DATADOG MONITORS: Currently alerting monitors and recent events
- DATADOG LOGS: Recent log entries matching the query
- ZENDESK: Customer-reported tickets
- GITHUB: Code search results
- SLACK HISTORY: Recent relevant messages from team channels
- TEAM EXPERTISE MAP: Who to contact for help on these topics
Analyze ALL provided data and respond with a JSON object:
{
"summary": "What is happening, grounded in the data provided",
"customerImpact": "Impact from Zendesk data, or null",
"steps": ["Step 1 as a string", "Step 2", "Step 3"],
"sources": [{"title": "...", "url": "...", "sourceType": "confluence|zendesk|datadog|github|docs"}],
"escalation": "Who to loop in based on expertise data, or null"
}
CRITICAL FORMAT RULES:
- "steps" MUST be an array of plain strings. NOT objects.
- Every step should be specific and actionable, citing which source.
- Respond ONLY with the JSON object.`;
You'll want to customize this heavily for your company. The more specific your system prompt is about your architecture, your terminology, and your deprecated vs. current systems, the better Claude's responses will be.
Now the main orchestrate() function:
/**
* Main orchestration loop.
* @param {string} query - The user's question
* @param {Object} options
* @param {string} [options.channelId] - Slack channel for history search
* @param {'incident'|'ask'} [options.mode] - Query mode
* @param {Array} [options.conversationHistory] - Previous Q&A turns
*/
export async function orchestrate(query, {
channelId,
mode = 'incident',
conversationHistory = [],
} = {}) {
// Step 0: Generate embedding once, reuse everywhere
let embedding;
try {
embedding = await generateEmbedding(query);
} catch (err) {
console.error('Embedding generation failed:', err.message);
embedding = null;
}
// Step 1: Check knowledge base + negative feedback in parallel
const [kbResult, negativeFeedback] = await Promise.all([
embedding
? searchKnowledgeBase(query, embedding)
: Promise.resolve({ match: 'none' }),
embedding
? searchNegativeFeedback(query, embedding)
: Promise.resolve([]),
]);
// Step 1a: If KB has a high-confidence cached answer, return immediately
if (kbResult.match === 'exact' && conversationHistory.length === 0) {
try {
const cachedAnswer = JSON.parse(kbResult.entry.answer);
return {
...cachedAnswer,
fromKnowledgeBase: true,
knowledgeBaseScore: kbResult.score,
knowledgeBaseEntryId: kbResult.entry.id,
};
} catch {
// Parse failed — fall through to normal flow
}
}
// Step 2: Build system prompt with KB context + negative feedback warnings
let systemPrompt = SYSTEM_PROMPT;
if (kbResult.match === 'partial') {
// Inject partial match as reference context for Claude
try {
const pastAnswer = JSON.parse(kbResult.entry.answer);
systemPrompt += `\n\nADDITIONAL CONTEXT — A similar past question was answered (similarity: ${kbResult.score.toFixed(2)}). Use as reference but verify:\nPast query: "${kbResult.entry.query}"\nPast answer: ${JSON.stringify(pastAnswer)}`;
} catch { /* ignore parse errors */ }
}
if (negativeFeedback.length > 0) {
// Warn Claude about past failures so it doesn't repeat them
const warnings = negativeFeedback
.map((fb) => `- "${fb.originalQuery}" was marked "${fb.category}"${fb.details ? `: "${fb.details}"` : ''}`)
.join('\n');
systemPrompt += `\n\nIMPORTANT — PAST NEGATIVE FEEDBACK:\n${warnings}\nAvoid repeating these mistakes.`;
}
// Step 3: Fetch ALL data sources in parallel
const searchResults = await fetchAllSources(query, embedding, { channelId });
// Step 4: Build context block from all results
const contextParts = [];
if (searchResults.harperDocs?.length)
contextParts.push(`DOCUMENTATION:\n${JSON.stringify(searchResults.harperDocs)}`);
if (searchResults.confluence?.length)
contextParts.push(`CONFLUENCE:\n${JSON.stringify(searchResults.confluence)}`);
if (searchResults.datadog?.monitors?.length)
contextParts.push(`DATADOG MONITORS:\n${JSON.stringify(searchResults.datadog.monitors)}`);
if (searchResults.datadog?.logs?.length)
contextParts.push(`DATADOG LOGS:\n${JSON.stringify(searchResults.datadog.logs)}`);
if (searchResults.zendesk?.length)
contextParts.push(`ZENDESK TICKETS:\n${JSON.stringify(searchResults.zendesk)}`);
if (searchResults.githubCode?.length)
contextParts.push(`GITHUB CODE:\n${JSON.stringify(searchResults.githubCode)}`);
if (searchResults.slackHistory?.length)
contextParts.push(`SLACK HISTORY:\n${JSON.stringify(searchResults.slackHistory)}`);
if (searchResults.expertise?.experts?.length)
contextParts.push(`TEAM EXPERTISE MAP:\n${JSON.stringify(searchResults.expertise)}`);
const contextBlock = contextParts.length > 0
? `\n\nData from all sources:\n\n${contextParts.join('\n\n')}`
: '\n\nNo relevant data found in any source.';
const userMessage = `Incident report: ${query}${contextBlock}\n\nAnalyze and respond with JSON.`;
// Step 5: Call Claude
const client = new Anthropic({ apiKey: config.claude.apiKey() });
let response;
for (let attempt = 1; attempt <= 3; attempt++) {
try {
response = await client.messages.create({
model: config.claude.model(),
max_tokens: 4096,
system: systemPrompt,
messages: [{ role: 'user', content: userMessage }],
});
break;
} catch (apiErr) {
if (apiErr.status === 429 && attempt < 3) {
const retryAfter = Number(apiErr.headers?.['retry-after']) || (attempt * 15);
await new Promise((r) => setTimeout(r, retryAfter * 1000));
continue;
}
throw apiErr;
}
}
// Step 6: Parse response
const textBlock = response.content.find((b) => b.type === 'text');
const rawResponse = textBlock?.text ?? '{}';
return parseResponse(rawResponse);
}
Step 4: The Parallel Source Fetcher
This is where the "fast" in "12-second response" comes from. Every data source fires at the same time. If Confluence takes 3 seconds and Datadog takes 1 second, the total is 3 seconds, not 4.
/**
* Fetch data from all configured sources in parallel.
* Each source is wrapped in try/catch so failures don't block others.
*/
async function fetchAllSources(query, embedding, { channelId } = {}) {
const results = {
harperDocs: [],
confluence: [],
datadog: { monitors: [], events: [], hosts: [], logs: [] },
zendesk: [],
githubCode: [],
slackHistory: [],
expertise: { experts: [], channels: [] },
};
const fetches = [];
// Documentation
fetches.push(
executeHarperDocsTool('search_harper_docs', { query, limit: 5 })
.then((r) => { results.harperDocs = r; })
.catch((err) => console.error('Docs search failed:', err.message))
);
// Confluence — wrapped in config check so it's skipped if not configured
try {
config.confluence.baseUrl(); // throws if not configured
fetches.push(
executeConfluenceTool('search_confluence', { query, limit: 5 })
.then((r) => { results.confluence = r; })
.catch((err) => console.error('Confluence failed:', err.message))
);
} catch { /* not configured — skip */ }
// Datadog
try {
config.datadog.apiKey();
fetches.push(
executeDatadogTool('get_alerting_monitors', {})
.then((r) => { results.datadog.monitors = r; })
.catch((err) => console.error('Datadog monitors failed:', err.message))
);
fetches.push(
executeDatadogTool('search_datadog_logs', {
query: `status:(error OR warn) ${query}`,
hours: 2,
limit: 15,
})
.then((r) => { results.datadog.logs = r; })
.catch((err) => console.error('Datadog logs failed:', err.message))
);
} catch { /* not configured */ }
// Zendesk
try {
config.zendesk.subdomain();
fetches.push(
executeZendeskTool('search_tickets', { query, limit: 5 })
.then((r) => { results.zendesk = r; })
.catch((err) => console.error('Zendesk failed:', err.message))
);
} catch { /* not configured */ }
// GitHub
try {
config.github.token();
fetches.push(
executeGitHubTool('search_github_code', { query, limit: 3 })
.then((r) => { results.githubCode = r; })
.catch((err) => console.error('GitHub failed:', err.message))
);
} catch { /* not configured */ }
await Promise.all(fetches);
return results;
}
The try { config.x.y() } catch {} pattern is intentional. If Datadog isn't configured, the config getter throws, and we skip that source entirely. Zero if statements, zero feature flags. Just configure an env var and the source lights up.
Step 5: The Response Parser
Claude outputs JSON, but LLMs are messy. Sometimes they wrap it in markdown code blocks. Sometimes the JSON has weird types. You need a parser that handles all the edge cases gracefully:
function parseResponse(text) {
const jsonStr = extractJson(text);
try {
const parsed = JSON.parse(jsonStr);
return normalizeResult(parsed);
} catch (err) {
// Last resort: regex out the summary field
const match = text.match(/"(?:answer|summary)"\s*:\s*"((?:[^"\\]|\\.)*)"/);
if (match) {
return {
summary: match[1].replace(/\\n/g, '\n').replace(/\\"/g, '"'),
customerImpact: null,
steps: [],
sources: [],
escalation: null,
};
}
// If all parsing fails, return the raw text
return { summary: text, steps: [], sources: [], escalation: null };
}
}
function extractJson(text) {
// Strategy 1: markdown code block
const codeBlock = text.match(/```
{% endraw %}
(?:json)?\s*([\s\S]*?)
{% raw %}
```/);
if (codeBlock) return codeBlock[1].trim();
// Strategy 2: outermost braces
const first = text.indexOf('{');
const last = text.lastIndexOf('}');
if (first !== -1 && last > first) return text.slice(first, last + 1);
// Strategy 3: raw text
return text.trim();
}
function normalizeResult(parsed) {
// Normalize steps to plain strings (Claude sometimes returns objects)
let steps = [];
if (Array.isArray(parsed.steps)) {
steps = parsed.steps.map((s) => {
if (typeof s === 'string') return s;
if (typeof s === 'object' && s !== null) {
return s.step ?? s.action ?? s.description ?? JSON.stringify(s);
}
return String(s);
});
}
// Normalize escalation to string
let escalation = parsed.escalation ?? null;
if (escalation && typeof escalation === 'object') {
escalation = escalation.message ?? escalation.text ?? JSON.stringify(escalation);
}
return {
summary: parsed.summary ?? parsed.answer ?? null,
customerImpact: parsed.customerImpact ?? null,
steps,
sources: Array.isArray(parsed.sources) ? parsed.sources : [],
escalation,
relatedTopics: Array.isArray(parsed.relatedTopics) ? parsed.relatedTopics : [],
};
}
The normalization code may look defensive, but it's the difference between a demo that works sometimes and a production system that works always. Claude will occasionally return steps as objects instead of strings, or wrap escalation in a nested object. This parser handles all of it.
Step 6: Wire It Into the API
Now update resources/Api.js to use the real orchestrator:
import { Resource, tables } from 'harperdb';
import crypto from 'crypto';
import { config } from '../lib/config.js';
import { orchestrate } from '../lib/orchestrator.js';
export class Api extends Resource {
static loadAsInstance = false;
async post(target, data) {
const context = this.getContext();
const authHeader = context.headers.get('authorization');
if (!authHeader) {
const err = new Error('Authorization required');
err.statusCode = 401;
throw err;
}
const { query, mode = 'ask' } = data ?? {};
if (!query?.trim()) {
const err = new Error('Missing "query" in request body');
err.statusCode = 400;
throw err;
}
// Run the full orchestration pipeline
const result = await orchestrate(query.trim(), { mode });
// Save to audit trail
const queryId = crypto.randomUUID();
await tables.IncidentQuery.put({
id: queryId,
query: query.trim(),
response: JSON.stringify(result),
sourcesUsed: result.sources?.map((s) => s.url ?? s.title) ?? [],
createdAt: new Date().toISOString(),
});
result.queryId = queryId;
return result;
}
}
Test it:
curl -X POST http://localhost:9926/Api \
-H "Content-Type: application/json" \
-H "Authorization: Basic $(echo -n 'admin:password' | base64)" \
-d '{"query": "How does replication work in our platform?", "mode": "ask"}'
You should get back a structured JSON response with a summary, sources from your actual data systems, and actionable steps — all grounded in your real documentation and internal knowledge.
What Just Happened
Let's appreciate what you built. In about 400 lines of code:
- Six data sources searched in parallel in 2-4 seconds
- Claude synthesizes all results into a structured, cited response
- Knowledge base is checked first for instant cache hits
- Negative feedback is checked to avoid repeating past mistakes
- Unconfigured sources are skipped gracefully, no code changes needed
- Rate limiting is handled with exponential backoff
- Response parsing handles every edge case Claude might throw at you
The orchestrator is the most valuable part of the entire system. Everything else, Slack, the web UI, the feedback loop, is just plumbing to get queries in and results out. This is the brain.
What's Next
In Part 4, we wire this into Slack, slash commands, @-mention handling, threaded conversations, and rich Block Kit responses with feedback buttons. Your team will be able to ask Harper Eye questions without leaving the tool they already live in.



Top comments (0)