Part 3: A theorical implementation of LLM selection and orchestration through a UI
Introduction-Bringing the Logic to Life by Integrating Our Custom Routing Engine
In the first installment of this series, we explored the core philosophy of intelligent, multi-LLM orchestration — unpacking the “T-Shirt size” optimization strategy that keeps cost-per-token at an absolute minimum without sacrificing performance. In Episode 2, we moved from theory to architecture, building a custom, plug-and-play routing engine from scratch using LiteLLM to establish a unified abstraction layer.
Now, it’s time to bring these pieces together in our own workspace.
In this third episode, we are diving into the practical integration phase. We will walk through how to plug this self-implemented routing layer directly into a custom orchestration workflow — simulating how a system like Bob evaluates live tasks, handles model failovers, and automatically routes every code generation or documentation request to its most cost-effective algorithmic match in real time.
> 🚨 Disclaimer: This article is for educational purposes only. It provides a conceptual, hands-on look at how LLM routing logic operates and should not be mistaken for the official enterprise implementation or architecture of the IBM Bob product.
Multi-LLM Routing — Part 3: (Almost) Final Implementation
Before diving into code, let’s look at how a raw, compound prompt travels through our custom system. Rather than passing the entire prompt blindly to a single heavy LLM, our integration pipeline enforces a strict four-stage flow:
┌─────────────────────────────────────────────────────────────────┐
│ Sidebar │ Main │
│ ───────────────────── │ ──────────────────────────────────── │
│ Model Registry │ Top bar (title + dry-run toggle) │
│ T1 IBM Granite │ │
│ T2 Meta LLaMA │ Chat Thread │
│ T3 Google Gemma │ ┌─ User bubble ──────────────────┐ │
│ T4 Mistral AI │ │ "Write API docs AND OAuth2" │ │
│ │ └────────────────────────────────┘ │
│ Formula │ ┌─ Routing Card ─────────────────┐ │
│ U = 0.60·Q − 0.40·C │ │ 🔀 2 sub-tasks detected │ │
│ │ │ ┌ documentation T1 Granite ┐ │ │
│ Session Stats │ │ │ complexity 26% Q 64% U+0.36│ │ │
│ Total calls │ │ └────────────────────────────┘ │ │
│ Avg quality │ │ ┌ code_generation T3 Gemma ┐ │ │
│ Total cost │ │ │ complexity 61% Q 85% U+0.37│ │ │
│ │ │ └────────────────────────────┘ │ │
│ │ └────────────────────────────────┘ │
│ │ ┌─ Response Card ─────────────────┐ │
│ │ │ [T1 documentation][T3 code][⊞] │ │
│ │ │ 📡 IBM Granite ⏱ 312ms 📝 214│ │
│ │ │ <rendered markdown response> │ │
│ │ └────────────────────────────────┘ │
│ │ │
│ │ Input area (textarea + send button) │
└─────────────────────────┴────────────────────────────────────────┘
-
Split & Classify (
splitter.py+task_classifier.py): The prompt is tokenized and dissected into atomic sub-tasks, each stamped with a specific type and evaluated for complexity (0.0 to 1.0). -
Dispatch & Route (
router.py): For each sub-task, the router matches capabilities and applies the utility function U=β⋅Q−α⋅C to pick the optimal tier. -
Parallel Execution & Abstraction (
orchestrator.py+llm_client.py): Sub-tasks are dispatched simultaneously using asynchronous concurrency, communicating via LiteLLM to local Ollama nodes. -
Aggregation & Feedback Storage (
feedback.py): The final answers are merged into a unified markdown response, and execution metadata (latency, tokens, costs) is captured in an append-only analytics ledger.
Technical Synthesis: Key Code Implementations
Let’s look at the actual code mechanisms making this cross-model coordination possible.
The Evaluation Core: Splitting & Routing
When a compound request comes in (e.g., “Write a detailed API documentation section and code a secure OAuth2 decorator”), it must be handled independently. The Orchestrator triggers the classification pipeline to build a routing table before executing any model calls.
"""
orchestrator.py — enriched for the Chat UI
Ties together splitting, routing, parallel execution, and aggregation.
Returns full routing metadata (utility score, quality score, ollama model)
so the UI can display the decision rationale alongside each response.
"""
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass, field
from src.splitter import split_and_classify
from src.router import dispatch, _utility, TASK_QUALITY_THRESHOLDS, QUALITY_THRESHOLD
from src.llm_client import call_model, TIER_TO_MODEL
from src.feedback import FeedbackStore
from src.task_classifier import TaskProfile
from src.cost_registry import ModelSpec
@dataclass
class SubTaskResult:
profile: TaskProfile
model: ModelSpec
response: str
latency_ms: int
out_tokens: int
# ── routing metadata exposed to the UI ──────────────────────────────────
utility_score: float # U = β·Q − α·C
quality_score: float # model.quality_score(complexity)
threshold: float # per-task quality threshold applied
ollama_model: str # concrete ollama model name used
cost_units: float # normalised cost for this call
@dataclass
class OrchestratorResult:
prompt: str
sub_results: list[SubTaskResult]
merged: str
total_ms: int
def routing_table(self) -> list[dict]:
return [
{
"task_type": r.profile.task_type.value,
"complexity": r.profile.complexity,
"tier": r.profile.tier.name,
"model": r.model.label,
"provider": r.model.provider,
"ollama_model": r.ollama_model,
"utility": round(r.utility_score, 4),
"quality": round(r.quality_score, 4),
"threshold": r.threshold,
"latency_ms": r.latency_ms,
"out_tokens": r.out_tokens,
"cost_units": r.cost_units,
"response": r.response,
}
for r in self.sub_results
]
def to_ui_payload(self) -> dict:
"""Full payload for the chat UI — includes per-subtask routing details."""
return {
"prompt": self.prompt,
"total_ms": self.total_ms,
"merged": self.merged,
"sub_tasks": self.routing_table(),
}
class Orchestrator:
def __init__(self, feedback_store: FeedbackStore | None = None) -> None:
self._store = feedback_store or FeedbackStore()
async def handle(self, prompt: str) -> OrchestratorResult:
"""
Full pipeline:
1. Split + classify
2. Route via utility function U = β·Q − α·C
3. Execute all sub-tasks in parallel via LiteLLM Router
4. Aggregate + record
"""
t_start = time.perf_counter()
profiles = split_and_classify(prompt)
routing = dispatch(profiles)
async def _exec(key: str, profile: TaskProfile, model: ModelSpec) -> SubTaskResult:
response, latency, tokens = await call_model(
model_label=model.label,
prompt=profile.raw_text,
max_tokens=min(2048, profile.token_estimate * 3),
)
lat_ms = int(latency * 1000)
u = _utility(model, profile)
q = model.quality_score(profile.complexity)
thr = TASK_QUALITY_THRESHOLDS.get(profile.task_type.value, QUALITY_THRESHOLD)
ollama_name = TIER_TO_MODEL.get(model.label, model.label)
cost = round(model.cost_per_1k * tokens / 1000, 6)
self._store.record(
task_type=profile.task_type.value,
complexity=profile.complexity,
tier=model.tier,
model_used=model.label,
latency_ms=lat_ms,
output_tokens=tokens,
cost_per_1k=model.cost_per_1k,
prompt_len=len(profile.raw_text),
)
return SubTaskResult(
profile=profile,
model=model,
response=response,
latency_ms=lat_ms,
out_tokens=tokens,
utility_score=round(u, 4),
quality_score=round(q, 4),
threshold=thr,
ollama_model=ollama_name,
cost_units=cost,
)
sub_results = list(await asyncio.gather(
*[_exec(k, p, m) for k, (p, m) in routing.items()]
))
merged = self._merge(sub_results)
total_ms = int((time.perf_counter() - t_start) * 1000)
return OrchestratorResult(
prompt=prompt,
sub_results=sub_results,
merged=merged,
total_ms=total_ms,
)
def handle_sync(self, prompt: str) -> OrchestratorResult:
return asyncio.run(self.handle(prompt))
@staticmethod
def _merge(results: list[SubTaskResult]) -> str:
sections = []
for r in results:
label = r.profile.task_type.value.replace("_", " ").title()
header = (
f"## {label} "
f"_(via {r.model.label} [{r.model.provider}], {r.latency_ms} ms)_"
)
sections.append(f"{header}\n\n{r.response.strip()}")
return "\n\n---\n\n".join(sections)
The dispatch routine in router.py picks the model using our utility metric. For example, a code task requires a higher threshold (0.72) than prose documentation (0.60), forcing a higher tier if complexity spikes:
"""
router.py
Core routing logic: selects the cheapest model that meets the quality threshold
for each TaskProfile, using a utility function U = β·Q − α·C.
"""
from __future__ import annotations
from src.cost_registry import MODEL_REGISTRY, ModelSpec
from src.task_classifier import TaskProfile
# ── Tunable parameters ────────────────────────────────────────────────────────
QUALITY_THRESHOLD = 0.72 # default minimum acceptable quality score
COST_WEIGHT = 0.40 # α — penalise cost
QUALITY_WEIGHT = 0.60 # β — reward quality
# Per-task-type quality thresholds.
# Documentation / summaries tolerate lower quality from a cheap model;
# code and reasoning tasks require a higher bar.
TASK_QUALITY_THRESHOLDS: dict[str, float] = {
"documentation": 0.60, # light model acceptable for prose
"summarisation": 0.55, # even more lenient for summaries
"qa_simple": 0.55,
"code_generation": 0.72,
"code_review": 0.85, # security: high bar
"reasoning": 0.80,
}
# Task types that require the heaviest tier (Tier 4) regardless of complexity score.
# code_review always goes to Mistral (security-aware, largest context)
SECURITY_OVERRIDES: set[str] = {"code_review"}
def _utility(model: ModelSpec, profile: TaskProfile) -> float:
"""
Utility U = β·Q − α·C
Q = model.quality_score(complexity) ∈ [0, 1]
C = model.cost_per_1k (normalised) ∈ [0, 1]
A higher U means the model is preferred.
"""
q = model.quality_score(profile.complexity)
c = model.cost_per_1k
return QUALITY_WEIGHT * q - COST_WEIGHT * c
def route(profile: TaskProfile) -> ModelSpec:
"""
Return the optimal ModelSpec for the given TaskProfile.
Selection rules (in order):
1. Model must support the task type.
2. Model context window must fit the estimated token budget.
3. Model quality_score must meet QUALITY_THRESHOLD
(unless it's a security override — then tier 4 is forced).
4. Among qualifying candidates, the one with the highest utility wins.
5. If no candidate passes the threshold, fall back to the highest-quality model.
"""
ttype = profile.task_type.value
# Force tier-4 (heaviest) for security-sensitive task types
if ttype in SECURITY_OVERRIDES:
heavy = next(m for m in MODEL_REGISTRY if m.tier == 4)
return heavy
threshold = TASK_QUALITY_THRESHOLDS.get(ttype, QUALITY_THRESHOLD)
candidates = [
m for m in MODEL_REGISTRY
if ttype in m.capable_types
and m.max_tokens >= profile.token_estimate
and m.quality_score(profile.complexity) >= threshold
]
if not candidates:
# Safety net: pick highest-quality model regardless of cost
candidates = sorted(
MODEL_REGISTRY,
key=lambda m: m.quality_score(profile.complexity),
reverse=True,
)
return max(candidates, key=lambda m: _utility(m, profile))
def dispatch(profiles: list[TaskProfile]) -> dict[str, tuple[TaskProfile, ModelSpec]]:
"""
Route each sub-task independently.
Returns a mapping of task_type → (profile, selected_model).
When multiple sub-tasks share the same type, a suffix is appended.
"""
result: dict[str, tuple[TaskProfile, ModelSpec]] = {}
type_counts: dict[str, int] = {}
for profile in profiles:
key = profile.task_type.value
count = type_counts.get(key, 0)
type_counts[key] = count + 1
unique_key = key if count == 0 else f"{key}_{count}"
result[unique_key] = (profile, route(profile))
return result
Concurrent Multi-LLM Execution via LiteLLM
Executing multiple sub-tasks sequentially kills the user experience. By wrapping our Router inside an asynchronous execution map using Python's asyncio.gather, we fire concurrent requests safely.
Here is how the asynchronous executor loops through the resolved routing map, hits llm_client.py (powered by litellm.Router), and tracks precise metrics:
# From src/orchestrator.py
async def handle(self, prompt: str) -> OrchestratorResult:
t_start = time.perf_counter()
profiles = split_and_classify(prompt)
routing = dispatch(profiles)
async def _exec(key: str, profile: TaskProfile, model: ModelSpec) -> SubTaskResult:
# Resolve concrete Ollama mapping (e.g., model::light -> granite4.1:3b)
ollama_name = TIER_TO_MODEL.get(model.label, model.label)
thr = TASK_QUALITY_THRESHOLDS.get(profile.task_type.value, QUALITY_THRESHOLD)
q = model.quality_score(profile.complexity)
u = _utility(model, profile)
# Execute call via LiteLLM gateway wrapper
response, lat_sec, tokens = await call_model(
model_label=model.label,
prompt=profile.raw_text,
max_tokens=model.max_tokens
)
lat_ms = int(lat_sec * 1000)
cost = model.cost_per_1k * tokens / 1000.0
# Persistent audit feedback trail
self.feedback_store.record(
task_type=profile.task_type.value,
complexity=profile.complexity,
tier=model.tier,
model_used=ollama_name,
latency_ms=lat_ms,
cost_units=cost,
output_tokens=tokens,
prompt_len=len(profile.raw_text),
)
return SubTaskResult(
profile=profile, model=model, response=response,
latency_ms=lat_ms, out_tokens=tokens, utility_score=round(u, 4),
quality_score=round(q, 4), threshold=thr, ollama_model=ollama_name, cost_units=cost
)
# Fire concurrent async execution workers across all models
sub_results = list(await asyncio.gather(
*[_exec(k, p, m) for k, (p, m) in routing.items()]
))
merged = self._merge(sub_results)
total_ms = int((time.perf_counter() - t_start) * 1000)
return OrchestratorResult(prompt=prompt, sub_results=sub_results, merged=merged, total_ms=total_ms)
Persistent Telemetry and Analytics Tracking
To verify that our routing architecture actually saves money or routes accurately over time, every transaction leaves a signature trace in an append-only JSONL feedback vault.
"""
feedback.py
Records routing decisions and outcome signals.
Used to surface per-tier quality statistics and to guide future tuning.
"""
from __future__ import annotations
import json
import os
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
@dataclass
class RoutingRecord:
timestamp: str # ISO-8601
task_type: str
complexity: float
tier: int
model_used: str
latency_ms: int
cost_units: float # cost_per_1k × output_tokens / 1000
quality_eval: float # auto-eval heuristic: 0.0 – 1.0
prompt_len: int # character count of original prompt
class FeedbackStore:
"""Append-only JSONL store for routing records."""
def __init__(self, path: str = "output/feedback.jsonl") -> None:
self._path = Path(path)
self._path.parent.mkdir(parents=True, exist_ok=True)
self._records: list[RoutingRecord] = self._load()
# ── Persistence ───────────────────────────────────────────────────────────
def _load(self) -> list[RoutingRecord]:
if not self._path.exists():
return []
records = []
with self._path.open() as fh:
for line in fh:
line = line.strip()
if line:
records.append(RoutingRecord(**json.loads(line)))
return records
def record(
self,
task_type: str,
complexity: float,
tier: int,
model_used: str,
latency_ms: int,
output_tokens: int,
cost_per_1k: float,
prompt_len: int,
) -> None:
quality_eval = self._heuristic_quality(latency_ms, output_tokens)
cost_units = cost_per_1k * output_tokens / 1000.0
rec = RoutingRecord(
timestamp=datetime.now(timezone.utc).isoformat(),
task_type=task_type,
complexity=round(complexity, 3),
tier=tier,
model_used=model_used,
latency_ms=latency_ms,
cost_units=round(cost_units, 6),
quality_eval=round(quality_eval, 3),
prompt_len=prompt_len,
)
self._records.append(rec)
with self._path.open("a") as fh:
fh.write(json.dumps(asdict(rec)) + "\n")
# ── Analytics ─────────────────────────────────────────────────────────────
@staticmethod
def _heuristic_quality(latency_ms: int, output_tokens: int) -> float:
"""
Rough auto-eval: reward high token output at low latency.
Real systems replace this with a judge-LLM or human rating.
"""
if output_tokens == 0:
return 0.0
tokens_per_second = output_tokens / max(1, latency_ms / 1000)
return min(1.0, 0.5 + tokens_per_second / 60)
def summary(self) -> dict:
if not self._records:
return {"total_calls": 0}
total_cost = sum(r.cost_units for r in self._records)
avg_quality = sum(r.quality_eval for r in self._records) / len(self._records)
by_tier: dict[int, int] = {}
for r in self._records:
by_tier[r.tier] = by_tier.get(r.tier, 0) + 1
return {
"total_calls": len(self._records),
"total_cost": round(total_cost, 6),
"avg_quality": round(avg_quality, 3),
"calls_by_tier": by_tier,
}
Bringing It All Together: Exposing the Pipeline to an Interface
With our orchestration core wrapped up, exposing it requires an lightweight API layer. Using FastAPI, we can expose the entire orchestration run payload directly to a client web interface.
#!/usr/bin/env python3
"""
app.py — LLM Routing Chat UI
FastAPI backend that serves the single-page chat interface and exposes:
GET / → chat UI (static/index.html)
POST /api/chat → route prompt, call LLMs, return enriched JSON
GET /api/models → model registry (tiers, providers, ollama names)
GET /api/stats → routing feedback statistics
GET /health → liveness probe
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
import uvicorn
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
sys.path.insert(0, str(Path(__file__).parent))
from src.orchestrator import Orchestrator
from src.feedback import FeedbackStore
from src.cost_registry import MODEL_REGISTRY
from src.llm_client import TIER_TO_MODEL
# ── App ───────────────────────────────────────────────────────────────────────
app = FastAPI(
title="LLM Routing Chat",
description="Chat UI that demonstrates intelligent LLM routing via LiteLLM → Ollama",
version="1.0.0",
docs_url="/api/docs",
)
feedback_store = FeedbackStore(path="output/feedback.jsonl")
orchestrator = Orchestrator(feedback_store=feedback_store)
# ── Serve the UI ──────────────────────────────────────────────────────────────
STATIC_DIR = Path(__file__).parent / "static"
@app.get("/", response_class=HTMLResponse, include_in_schema=False)
async def serve_ui():
html = (STATIC_DIR / "index.html").read_text()
return HTMLResponse(content=html)
# ── Request / Response models ─────────────────────────────────────────────────
class ChatRequest(BaseModel):
prompt: str
dry_run: bool = False
# ── API endpoints ─────────────────────────────────────────────────────────────
@app.post("/api/chat")
async def chat(req: ChatRequest):
"""
Route a prompt through the LLM pipeline and return the full response
with routing metadata for the UI.
"""
if not req.prompt.strip():
raise HTTPException(status_code=400, detail="Prompt must not be empty.")
if req.dry_run:
from src.splitter import split_and_classify
from src.router import dispatch, _utility, TASK_QUALITY_THRESHOLDS, QUALITY_THRESHOLD
profiles = split_and_classify(req.prompt)
routing = dispatch(profiles)
sub_tasks = []
for key, (p, m) in routing.items():
sub_tasks.append({
"task_type": p.task_type.value,
"complexity": p.complexity,
"tier": p.tier.name,
"model": m.label,
"provider": m.provider,
"ollama_model": TIER_TO_MODEL.get(m.label, m.label),
"utility": round(_utility(m, p), 4),
"quality": round(m.quality_score(p.complexity), 4),
"threshold": TASK_QUALITY_THRESHOLDS.get(p.task_type.value, QUALITY_THRESHOLD),
"latency_ms": None,
"out_tokens": None,
"cost_units": None,
"response": None,
})
return JSONResponse(content={
"prompt": req.prompt,
"total_ms": None,
"merged": None,
"sub_tasks": sub_tasks,
"dry_run": True,
})
try:
result = await orchestrator.handle(req.prompt)
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
payload = result.to_ui_payload()
payload["dry_run"] = False
return JSONResponse(content=payload)
@app.get("/api/models")
async def get_models():
"""Return the full model registry for the UI sidebar."""
return JSONResponse(content=[
{
"label": m.label,
"tier": m.tier,
"provider": m.provider,
"ollama_model": TIER_TO_MODEL.get(m.label, "?"),
"cost_per_1k": m.cost_per_1k,
"max_tokens": m.max_tokens,
"capable_types": list(m.capable_types),
"quality_at_low": round(m.quality_score(0.1), 3),
"quality_at_high": round(m.quality_score(0.9), 3),
}
for m in MODEL_REGISTRY
])
@app.get("/api/stats")
async def get_stats():
"""Return aggregated routing statistics."""
return JSONResponse(content=feedback_store.summary())
@app.get("/health")
async def health():
return {"status": "ok", "gateway": "LiteLLM Router → Ollama"}
# ── Entry point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
import argparse
p = argparse.ArgumentParser(description="LLM Routing Chat UI")
p.add_argument("--host", default="0.0.0.0")
p.add_argument("--port", type=int, default=8080)
args = p.parse_args()
uvicorn.run("app:app", host=args.host, port=args.port, reload=False)
The resulting UI payload returns not only the merged markdown string text, but also the dynamic sub_tasks list showing the specific complexity scores, thresholds, and utility equations that calculated the outcome.
Conclusion & What’s Next
By anchoring our tasks within an automated routing structure, we achieve a system that behaves like an intelligent scheduler. High-complexity code reviews default automatically to secure, large-context models (Tier 4), while routine explanations map directly to fast, resource-light foundational parameters (Tier 1).
We have achieved real-time cost-per-token optimization without compromising code production viability.
Coming up next in Part 4: We will explore how to configure advanced model failovers, optimize dynamic thresholds based on live telemetry, and build a front-end interface that renders these multi-LLM decisions visually in real time!
🎯 That’s a wrap 💯 for part 3 and thanks for reading!
Stay tuned for the next episode… 📺
Links
- Code repository for this post: https://github.com/aairom/multillm-taskrouting/tree/main/routing-implementation
- IBM Bob: https://bob.ibm.com/




Top comments (0)