DEV Community

Alain Airom (Ayrom)
Alain Airom (Ayrom)

Posted on

The ROI of SDLC: How Bob Orchestration of LLMs Makes Financially Sense (Part 3)-Showtime

Part 3: A theorical implementation of LLM selection and orchestration through a UI

Introduction-Bringing the Logic to Life by Integrating Our Custom Routing Engine

In the first installment of this series, we explored the core philosophy of intelligent, multi-LLM orchestration — unpacking the “T-Shirt size” optimization strategy that keeps cost-per-token at an absolute minimum without sacrificing performance. In Episode 2, we moved from theory to architecture, building a custom, plug-and-play routing engine from scratch using LiteLLM to establish a unified abstraction layer.

Now, it’s time to bring these pieces together in our own workspace.

In this third episode, we are diving into the practical integration phase. We will walk through how to plug this self-implemented routing layer directly into a custom orchestration workflow — simulating how a system like Bob evaluates live tasks, handles model failovers, and automatically routes every code generation or documentation request to its most cost-effective algorithmic match in real time.

> 🚨 Disclaimer: This article is for educational purposes only. It provides a conceptual, hands-on look at how LLM routing logic operates and should not be mistaken for the official enterprise implementation or architecture of the IBM Bob product.


Multi-LLM Routing — Part 3: (Almost) Final Implementation

Before diving into code, let’s look at how a raw, compound prompt travels through our custom system. Rather than passing the entire prompt blindly to a single heavy LLM, our integration pipeline enforces a strict four-stage flow:

┌─────────────────────────────────────────────────────────────────┐
│  Sidebar                │  Main                                  │
│  ─────────────────────  │  ────────────────────────────────────  │
│  Model Registry         │  Top bar (title + dry-run toggle)      │
│   T1 IBM Granite        │                                        │
│   T2 Meta LLaMA         │  Chat Thread                           │
│   T3 Google Gemma       │   ┌─ User bubble ──────────────────┐   │
│   T4 Mistral AI         │   │  "Write API docs AND OAuth2"   │   │
│                         │   └────────────────────────────────┘   │
│  Formula                │   ┌─ Routing Card ─────────────────┐   │
│  U = 0.60·Q − 0.40·C   │   │  🔀 2 sub-tasks detected       │    │
│                         │   │  ┌ documentation  T1 Granite ┐ │   │
│  Session Stats          │   │  │ complexity 26% Q 64% U+0.36│ │  │
│   Total calls           │   │  └────────────────────────────┘ │  │
│   Avg quality           │   │  ┌ code_generation T3 Gemma  ┐ │   │
│   Total cost            │   │  │ complexity 61% Q 85% U+0.37│ │  │
│                         │   │  └────────────────────────────┘ │  │
│                         │   └────────────────────────────────┘   │
│                         │   ┌─ Response Card ─────────────────┐  │
│                         │   │ [T1 documentation][T3 code][⊞]  │  │
│                         │   │  📡 IBM Granite  ⏱ 312ms  📝 214│ │
│                         │   │  <rendered markdown response>   │  │
│                         │   └────────────────────────────────┘   │
│                         │                                        │
│                         │  Input area (textarea + send button)   │
└─────────────────────────┴────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode
  1. Split & Classify (splitter.py + task_classifier.py): The prompt is tokenized and dissected into atomic sub-tasks, each stamped with a specific type and evaluated for complexity (0.0 to 1.0).
  2. Dispatch & Route (router.py): For each sub-task, the router matches capabilities and applies the utility function U=β⋅Q−α⋅C to pick the optimal tier.
  3. Parallel Execution & Abstraction (orchestrator.py + llm_client.py): Sub-tasks are dispatched simultaneously using asynchronous concurrency, communicating via LiteLLM to local Ollama nodes.
  4. Aggregation & Feedback Storage (feedback.py): The final answers are merged into a unified markdown response, and execution metadata (latency, tokens, costs) is captured in an append-only analytics ledger.

Technical Synthesis: Key Code Implementations

Let’s look at the actual code mechanisms making this cross-model coordination possible.

The Evaluation Core: Splitting & Routing

When a compound request comes in (e.g., “Write a detailed API documentation section and code a secure OAuth2 decorator”), it must be handled independently. The Orchestrator triggers the classification pipeline to build a routing table before executing any model calls.

"""
orchestrator.py  —  enriched for the Chat UI
Ties together splitting, routing, parallel execution, and aggregation.
Returns full routing metadata (utility score, quality score, ollama model)
so the UI can display the decision rationale alongside each response.
"""

from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass, field

from src.splitter import split_and_classify
from src.router import dispatch, _utility, TASK_QUALITY_THRESHOLDS, QUALITY_THRESHOLD
from src.llm_client import call_model, TIER_TO_MODEL
from src.feedback import FeedbackStore
from src.task_classifier import TaskProfile
from src.cost_registry import ModelSpec


@dataclass
class SubTaskResult:
    profile:       TaskProfile
    model:         ModelSpec
    response:      str
    latency_ms:    int
    out_tokens:    int
    # ── routing metadata exposed to the UI ──────────────────────────────────
    utility_score: float        # U = β·Q − α·C
    quality_score: float        # model.quality_score(complexity)
    threshold:     float        # per-task quality threshold applied
    ollama_model:  str          # concrete ollama model name used
    cost_units:    float        # normalised cost for this call


@dataclass
class OrchestratorResult:
    prompt:       str
    sub_results:  list[SubTaskResult]
    merged:       str
    total_ms:     int

    def routing_table(self) -> list[dict]:
        return [
            {
                "task_type":    r.profile.task_type.value,
                "complexity":   r.profile.complexity,
                "tier":         r.profile.tier.name,
                "model":        r.model.label,
                "provider":     r.model.provider,
                "ollama_model": r.ollama_model,
                "utility":      round(r.utility_score, 4),
                "quality":      round(r.quality_score, 4),
                "threshold":    r.threshold,
                "latency_ms":   r.latency_ms,
                "out_tokens":   r.out_tokens,
                "cost_units":   r.cost_units,
                "response":     r.response,
            }
            for r in self.sub_results
        ]

    def to_ui_payload(self) -> dict:
        """Full payload for the chat UI — includes per-subtask routing details."""
        return {
            "prompt":      self.prompt,
            "total_ms":    self.total_ms,
            "merged":      self.merged,
            "sub_tasks":   self.routing_table(),
        }


class Orchestrator:
    def __init__(self, feedback_store: FeedbackStore | None = None) -> None:
        self._store = feedback_store or FeedbackStore()

    async def handle(self, prompt: str) -> OrchestratorResult:
        """
        Full pipeline:
          1. Split + classify
          2. Route via utility function U = β·Q − α·C
          3. Execute all sub-tasks in parallel via LiteLLM Router
          4. Aggregate + record
        """
        t_start = time.perf_counter()
        profiles = split_and_classify(prompt)
        routing  = dispatch(profiles)

        async def _exec(key: str, profile: TaskProfile, model: ModelSpec) -> SubTaskResult:
            response, latency, tokens = await call_model(
                model_label=model.label,
                prompt=profile.raw_text,
                max_tokens=min(2048, profile.token_estimate * 3),
            )
            lat_ms      = int(latency * 1000)
            u           = _utility(model, profile)
            q           = model.quality_score(profile.complexity)
            thr         = TASK_QUALITY_THRESHOLDS.get(profile.task_type.value, QUALITY_THRESHOLD)
            ollama_name = TIER_TO_MODEL.get(model.label, model.label)
            cost        = round(model.cost_per_1k * tokens / 1000, 6)

            self._store.record(
                task_type=profile.task_type.value,
                complexity=profile.complexity,
                tier=model.tier,
                model_used=model.label,
                latency_ms=lat_ms,
                output_tokens=tokens,
                cost_per_1k=model.cost_per_1k,
                prompt_len=len(profile.raw_text),
            )
            return SubTaskResult(
                profile=profile,
                model=model,
                response=response,
                latency_ms=lat_ms,
                out_tokens=tokens,
                utility_score=round(u, 4),
                quality_score=round(q, 4),
                threshold=thr,
                ollama_model=ollama_name,
                cost_units=cost,
            )

        sub_results = list(await asyncio.gather(
            *[_exec(k, p, m) for k, (p, m) in routing.items()]
        ))

        merged   = self._merge(sub_results)
        total_ms = int((time.perf_counter() - t_start) * 1000)

        return OrchestratorResult(
            prompt=prompt,
            sub_results=sub_results,
            merged=merged,
            total_ms=total_ms,
        )

    def handle_sync(self, prompt: str) -> OrchestratorResult:
        return asyncio.run(self.handle(prompt))

    @staticmethod
    def _merge(results: list[SubTaskResult]) -> str:
        sections = []
        for r in results:
            label  = r.profile.task_type.value.replace("_", " ").title()
            header = (
                f"## {label}  "
                f"_(via {r.model.label} [{r.model.provider}], {r.latency_ms} ms)_"
            )
            sections.append(f"{header}\n\n{r.response.strip()}")
        return "\n\n---\n\n".join(sections)
Enter fullscreen mode Exit fullscreen mode

The dispatch routine in router.py picks the model using our utility metric. For example, a code task requires a higher threshold (0.72) than prose documentation (0.60), forcing a higher tier if complexity spikes:

"""
router.py
Core routing logic: selects the cheapest model that meets the quality threshold
for each TaskProfile, using a utility function U = β·Q − α·C.
"""

from __future__ import annotations
from src.cost_registry import MODEL_REGISTRY, ModelSpec
from src.task_classifier import TaskProfile

# ── Tunable parameters ────────────────────────────────────────────────────────
QUALITY_THRESHOLD = 0.72   # default minimum acceptable quality score
COST_WEIGHT       = 0.40   # α — penalise cost
QUALITY_WEIGHT    = 0.60   # β — reward quality

# Per-task-type quality thresholds.
# Documentation / summaries tolerate lower quality from a cheap model;
# code and reasoning tasks require a higher bar.
TASK_QUALITY_THRESHOLDS: dict[str, float] = {
    "documentation":   0.60,   # light model acceptable for prose
    "summarisation":   0.55,   # even more lenient for summaries
    "qa_simple":       0.55,
    "code_generation": 0.72,
    "code_review":     0.85,   # security: high bar
    "reasoning":       0.80,
}

# Task types that require the heaviest tier (Tier 4) regardless of complexity score.
# code_review always goes to Mistral (security-aware, largest context)
SECURITY_OVERRIDES: set[str] = {"code_review"}


def _utility(model: ModelSpec, profile: TaskProfile) -> float:
    """
    Utility  U = β·Q − α·C
      Q = model.quality_score(complexity)   ∈ [0, 1]
      C = model.cost_per_1k (normalised)    ∈ [0, 1]
    A higher U means the model is preferred.
    """
    q = model.quality_score(profile.complexity)
    c = model.cost_per_1k
    return QUALITY_WEIGHT * q - COST_WEIGHT * c


def route(profile: TaskProfile) -> ModelSpec:
    """
    Return the optimal ModelSpec for the given TaskProfile.

    Selection rules (in order):
      1. Model must support the task type.
      2. Model context window must fit the estimated token budget.
      3. Model quality_score must meet QUALITY_THRESHOLD
         (unless it's a security override — then tier 4 is forced).
      4. Among qualifying candidates, the one with the highest utility wins.
      5. If no candidate passes the threshold, fall back to the highest-quality model.
    """
    ttype = profile.task_type.value

    # Force tier-4 (heaviest) for security-sensitive task types
    if ttype in SECURITY_OVERRIDES:
        heavy = next(m for m in MODEL_REGISTRY if m.tier == 4)
        return heavy

    threshold = TASK_QUALITY_THRESHOLDS.get(ttype, QUALITY_THRESHOLD)

    candidates = [
        m for m in MODEL_REGISTRY
        if ttype in m.capable_types
        and m.max_tokens >= profile.token_estimate
        and m.quality_score(profile.complexity) >= threshold
    ]

    if not candidates:
        # Safety net: pick highest-quality model regardless of cost
        candidates = sorted(
            MODEL_REGISTRY,
            key=lambda m: m.quality_score(profile.complexity),
            reverse=True,
        )

    return max(candidates, key=lambda m: _utility(m, profile))


def dispatch(profiles: list[TaskProfile]) -> dict[str, tuple[TaskProfile, ModelSpec]]:
    """
    Route each sub-task independently.

    Returns a mapping of task_type → (profile, selected_model).
    When multiple sub-tasks share the same type, a suffix is appended.
    """
    result: dict[str, tuple[TaskProfile, ModelSpec]] = {}
    type_counts: dict[str, int] = {}

    for profile in profiles:
        key = profile.task_type.value
        count = type_counts.get(key, 0)
        type_counts[key] = count + 1
        unique_key = key if count == 0 else f"{key}_{count}"
        result[unique_key] = (profile, route(profile))

    return result
Enter fullscreen mode Exit fullscreen mode

Concurrent Multi-LLM Execution via LiteLLM

Executing multiple sub-tasks sequentially kills the user experience. By wrapping our Router inside an asynchronous execution map using Python's asyncio.gather, we fire concurrent requests safely.

Here is how the asynchronous executor loops through the resolved routing map, hits llm_client.py (powered by litellm.Router), and tracks precise metrics:

# From src/orchestrator.py
async def handle(self, prompt: str) -> OrchestratorResult:
    t_start = time.perf_counter()
    profiles = split_and_classify(prompt)
    routing = dispatch(profiles)

    async def _exec(key: str, profile: TaskProfile, model: ModelSpec) -> SubTaskResult:
        # Resolve concrete Ollama mapping (e.g., model::light -> granite4.1:3b)
        ollama_name = TIER_TO_MODEL.get(model.label, model.label)
        thr = TASK_QUALITY_THRESHOLDS.get(profile.task_type.value, QUALITY_THRESHOLD)
        q = model.quality_score(profile.complexity)
        u = _utility(model, profile)

        # Execute call via LiteLLM gateway wrapper
        response, lat_sec, tokens = await call_model(
            model_label=model.label,
            prompt=profile.raw_text,
            max_tokens=model.max_tokens
        )

        lat_ms = int(lat_sec * 1000)
        cost = model.cost_per_1k * tokens / 1000.0

        # Persistent audit feedback trail
        self.feedback_store.record(
            task_type=profile.task_type.value,
            complexity=profile.complexity,
            tier=model.tier,
            model_used=ollama_name,
            latency_ms=lat_ms,
            cost_units=cost,
            output_tokens=tokens,
            prompt_len=len(profile.raw_text),
        )

        return SubTaskResult(
            profile=profile, model=model, response=response,
            latency_ms=lat_ms, out_tokens=tokens, utility_score=round(u, 4),
            quality_score=round(q, 4), threshold=thr, ollama_model=ollama_name, cost_units=cost
        )

    # Fire concurrent async execution workers across all models
    sub_results = list(await asyncio.gather(
        *[_exec(k, p, m) for k, (p, m) in routing.items()]
    ))

    merged = self._merge(sub_results)
    total_ms = int((time.perf_counter() - t_start) * 1000)

    return OrchestratorResult(prompt=prompt, sub_results=sub_results, merged=merged, total_ms=total_ms)
Enter fullscreen mode Exit fullscreen mode

Persistent Telemetry and Analytics Tracking

To verify that our routing architecture actually saves money or routes accurately over time, every transaction leaves a signature trace in an append-only JSONL feedback vault.

"""
feedback.py
Records routing decisions and outcome signals.
Used to surface per-tier quality statistics and to guide future tuning.
"""

from __future__ import annotations
import json
import os
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path


@dataclass
class RoutingRecord:
    timestamp:    str    # ISO-8601
    task_type:    str
    complexity:   float
    tier:         int
    model_used:   str
    latency_ms:   int
    cost_units:   float   # cost_per_1k × output_tokens / 1000
    quality_eval: float   # auto-eval heuristic: 0.0 – 1.0
    prompt_len:   int     # character count of original prompt


class FeedbackStore:
    """Append-only JSONL store for routing records."""

    def __init__(self, path: str = "output/feedback.jsonl") -> None:
        self._path = Path(path)
        self._path.parent.mkdir(parents=True, exist_ok=True)
        self._records: list[RoutingRecord] = self._load()

    # ── Persistence ───────────────────────────────────────────────────────────

    def _load(self) -> list[RoutingRecord]:
        if not self._path.exists():
            return []
        records = []
        with self._path.open() as fh:
            for line in fh:
                line = line.strip()
                if line:
                    records.append(RoutingRecord(**json.loads(line)))
        return records

    def record(
        self,
        task_type:    str,
        complexity:   float,
        tier:         int,
        model_used:   str,
        latency_ms:   int,
        output_tokens: int,
        cost_per_1k:  float,
        prompt_len:   int,
    ) -> None:
        quality_eval = self._heuristic_quality(latency_ms, output_tokens)
        cost_units   = cost_per_1k * output_tokens / 1000.0

        rec = RoutingRecord(
            timestamp=datetime.now(timezone.utc).isoformat(),
            task_type=task_type,
            complexity=round(complexity, 3),
            tier=tier,
            model_used=model_used,
            latency_ms=latency_ms,
            cost_units=round(cost_units, 6),
            quality_eval=round(quality_eval, 3),
            prompt_len=prompt_len,
        )
        self._records.append(rec)
        with self._path.open("a") as fh:
            fh.write(json.dumps(asdict(rec)) + "\n")

    # ── Analytics ─────────────────────────────────────────────────────────────

    @staticmethod
    def _heuristic_quality(latency_ms: int, output_tokens: int) -> float:
        """
        Rough auto-eval: reward high token output at low latency.
        Real systems replace this with a judge-LLM or human rating.
        """
        if output_tokens == 0:
            return 0.0
        tokens_per_second = output_tokens / max(1, latency_ms / 1000)
        return min(1.0, 0.5 + tokens_per_second / 60)

    def summary(self) -> dict:
        if not self._records:
            return {"total_calls": 0}
        total_cost = sum(r.cost_units for r in self._records)
        avg_quality = sum(r.quality_eval for r in self._records) / len(self._records)
        by_tier: dict[int, int] = {}
        for r in self._records:
            by_tier[r.tier] = by_tier.get(r.tier, 0) + 1
        return {
            "total_calls":  len(self._records),
            "total_cost":   round(total_cost, 6),
            "avg_quality":  round(avg_quality, 3),
            "calls_by_tier": by_tier,
        }
Enter fullscreen mode Exit fullscreen mode

Bringing It All Together: Exposing the Pipeline to an Interface

With our orchestration core wrapped up, exposing it requires an lightweight API layer. Using FastAPI, we can expose the entire orchestration run payload directly to a client web interface.

#!/usr/bin/env python3
"""
app.py  —  LLM Routing Chat UI
FastAPI backend that serves the single-page chat interface and exposes:
  GET  /              → chat UI (static/index.html)
  POST /api/chat      → route prompt, call LLMs, return enriched JSON
  GET  /api/models    → model registry (tiers, providers, ollama names)
  GET  /api/stats     → routing feedback statistics
  GET  /health        → liveness probe
"""
from __future__ import annotations
import os
import sys
from pathlib import Path

from dotenv import load_dotenv
load_dotenv()

import uvicorn
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel

sys.path.insert(0, str(Path(__file__).parent))

from src.orchestrator import Orchestrator
from src.feedback import FeedbackStore
from src.cost_registry import MODEL_REGISTRY
from src.llm_client import TIER_TO_MODEL

# ── App ───────────────────────────────────────────────────────────────────────
app = FastAPI(
    title="LLM Routing Chat",
    description="Chat UI that demonstrates intelligent LLM routing via LiteLLM → Ollama",
    version="1.0.0",
    docs_url="/api/docs",
)

feedback_store = FeedbackStore(path="output/feedback.jsonl")
orchestrator   = Orchestrator(feedback_store=feedback_store)

# ── Serve the UI ──────────────────────────────────────────────────────────────
STATIC_DIR = Path(__file__).parent / "static"

@app.get("/", response_class=HTMLResponse, include_in_schema=False)
async def serve_ui():
    html = (STATIC_DIR / "index.html").read_text()
    return HTMLResponse(content=html)


# ── Request / Response models ─────────────────────────────────────────────────
class ChatRequest(BaseModel):
    prompt: str
    dry_run: bool = False


# ── API endpoints ─────────────────────────────────────────────────────────────

@app.post("/api/chat")
async def chat(req: ChatRequest):
    """
    Route a prompt through the LLM pipeline and return the full response
    with routing metadata for the UI.
    """
    if not req.prompt.strip():
        raise HTTPException(status_code=400, detail="Prompt must not be empty.")

    if req.dry_run:
        from src.splitter import split_and_classify
        from src.router import dispatch, _utility, TASK_QUALITY_THRESHOLDS, QUALITY_THRESHOLD
        profiles = split_and_classify(req.prompt)
        routing  = dispatch(profiles)
        sub_tasks = []
        for key, (p, m) in routing.items():
            sub_tasks.append({
                "task_type":    p.task_type.value,
                "complexity":   p.complexity,
                "tier":         p.tier.name,
                "model":        m.label,
                "provider":     m.provider,
                "ollama_model": TIER_TO_MODEL.get(m.label, m.label),
                "utility":      round(_utility(m, p), 4),
                "quality":      round(m.quality_score(p.complexity), 4),
                "threshold":    TASK_QUALITY_THRESHOLDS.get(p.task_type.value, QUALITY_THRESHOLD),
                "latency_ms":   None,
                "out_tokens":   None,
                "cost_units":   None,
                "response":     None,
            })
        return JSONResponse(content={
            "prompt":    req.prompt,
            "total_ms":  None,
            "merged":    None,
            "sub_tasks": sub_tasks,
            "dry_run":   True,
        })

    try:
        result = await orchestrator.handle(req.prompt)
    except Exception as exc:
        raise HTTPException(status_code=500, detail=str(exc)) from exc

    payload = result.to_ui_payload()
    payload["dry_run"] = False
    return JSONResponse(content=payload)


@app.get("/api/models")
async def get_models():
    """Return the full model registry for the UI sidebar."""
    return JSONResponse(content=[
        {
            "label":        m.label,
            "tier":         m.tier,
            "provider":     m.provider,
            "ollama_model": TIER_TO_MODEL.get(m.label, "?"),
            "cost_per_1k":  m.cost_per_1k,
            "max_tokens":   m.max_tokens,
            "capable_types": list(m.capable_types),
            "quality_at_low":  round(m.quality_score(0.1), 3),
            "quality_at_high": round(m.quality_score(0.9), 3),
        }
        for m in MODEL_REGISTRY
    ])


@app.get("/api/stats")
async def get_stats():
    """Return aggregated routing statistics."""
    return JSONResponse(content=feedback_store.summary())


@app.get("/health")
async def health():
    return {"status": "ok", "gateway": "LiteLLM Router → Ollama"}


# ── Entry point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
    import argparse
    p = argparse.ArgumentParser(description="LLM Routing Chat UI")
    p.add_argument("--host", default="0.0.0.0")
    p.add_argument("--port", type=int, default=8080)
    args = p.parse_args()
    uvicorn.run("app:app", host=args.host, port=args.port, reload=False)
Enter fullscreen mode Exit fullscreen mode

The resulting UI payload returns not only the merged markdown string text, but also the dynamic sub_tasks list showing the specific complexity scores, thresholds, and utility equations that calculated the outcome.

Conclusion & What’s Next

By anchoring our tasks within an automated routing structure, we achieve a system that behaves like an intelligent scheduler. High-complexity code reviews default automatically to secure, large-context models (Tier 4), while routine explanations map directly to fast, resource-light foundational parameters (Tier 1).

We have achieved real-time cost-per-token optimization without compromising code production viability.

Coming up next in Part 4: We will explore how to configure advanced model failovers, optimize dynamic thresholds based on live telemetry, and build a front-end interface that renders these multi-LLM decisions visually in real time!

🎯 That’s a wrap 💯 for part 3 and thanks for reading!

Stay tuned for the next episode… 📺

Links

Top comments (0)