DEV Community

Alain Airom (Ayrom)
Alain Airom (Ayrom)

Posted on

The ROI of SDLC: How Bob Orchestration of LLMs Makes Financially Sense (Part 2) Using LiteLLM

Part 2: A theorical implementation of LLM selection and orchestration using the open source LiteLLM

Introduction

In the first part of this series, we explored the theoretical foundation of intelligent orchestration (implemented by Bob :) ) β€” demonstrating how to dynamically route tasks to the most performant and cost-effective Large Language Model (LLM) based on context and complexity, and why this precise capability stands out as one of the definitive differentiators of IBM Bob. Now, it’s time to move from theory to production-grade architecture. In this second installment, we will implement this exact cost-routing logic using a powerful, production-ready open-source solution: LiteLLM with a unified abstraction layer, making a routing engine completely plug-and-play.

> 🚨 Disclaimer: This article is for educational purposes only. It provides a conceptual, hands-on look at how LLM routing logic (here with LiteLLM) operates and should not be mistaken for the official enterprise implementation or architecture of the IBM Bob product.


TL;DR-What is LiteLLM?

Image from LiteLLM Site

Image from LiteLLM Site

Excerpt of LiteLLM Github Repository
LiteLLM is an open source AI Gateway that gives you a single, unified interface to call 100+ LLM providers β€” OpenAI, Anthropic, Gemini, Bedrock, Azure, and more β€” using the OpenAI format.
Use it as a Python SDK for direct library integration, or deploy the AI Gateway (Proxy Server) as a centralized service for your team or organization.
Why LiteLLM
Managing LLM calls across providers gets complicated fast β€” different SDKs, auth patterns, request formats, and error types for every model. LiteLLM removes that friction:
Unified API β€” one interface for 100+ LLMs, no provider-specific SDK juggling
Drop-in OpenAI compatibility β€” swap providers without rewriting your code
Production-ready gateway β€” virtual keys, spend tracking, guardrails, load balancing, and an admin dashboard out of the box
8ms P95 latency at 1k RPS (benchmarks)


Multi-LLM Routing β€” Part 2: Implementation with LiteLLM

In the first part of this series, we saw the theoretical foundation of intelligent orchestration β€” demonstrating how to dynamically decompose a complex user prompt and route sub-tasks to the most performant and cost-effective model tier with an ad-hoc code.

In this post, we transition the concept of cost-routing pipeline into a scalable solution by integrating a powerful open-source solution: LiteLLM. By leveraging LiteLLM as a unified transport layer over our local Ollama instances, we decouple which tier is chosen from how the call is executed.

The Production-Grade Gateway Architecture

By integrating LiteLLM, the custom mathematical utility scoring while completely abstracting the underlying infrastructure transport remains. The orchestrator no longer addresses specific container endpoints directly; instead, it interfaces with a standardized, OpenAI-compatible abstraction layer.

Under this architecture, responsibility is split into two cleanly isolated layers:

  • The Custom Routing Pipeline (router.py): Handles the business intelligence. It scores task complexity and calculates which symbolic tier (model::light, model::medium, etc.) maximizes performance while minimizing token cost using our standard utility function:

  • The LiteLLM AI Gateway (llm_client.py): Handles transport resilience. It maps those symbolic tier aliases to concrete model weights and manages enterprise-grade networking features such as automatic retries, back-off parameters, fallback chains, and transient error cooling.

Let’s dig into that πŸͺ

Core Technical Implementation

Granular Task Decomposition

This part remains unchanged from the first version: to avoid processing unnecessary tokens through high-tier engines, the architecture relies on a specialized TaskSplitter. This component evaluates the raw user payload and breaks it down into discrete components so that documentation and code can travel along separate routing paths.

"""
splitter.py
Splits a compound prompt into independent clauses, then classifies each one.
"""

from __future__ import annotations
import re
from src.task_classifier import classify, TaskProfile


# Conjunctions that signal independent sub-tasks within a single prompt
_SPLIT_PATTERN = re.compile(
    r"(?i)\s*\b(and also|and then|additionally|plus|as well as|AND|also)\b\s*",
)


def split_and_classify(prompt: str) -> list[TaskProfile]:
    """
    1. Split the prompt on conjunction markers.
    2. Discard separator tokens.
    3. Classify each remaining clause.
    """
    raw_clauses = _SPLIT_PATTERN.split(prompt)

    # Keep only non-separator, non-empty parts
    clauses = [
        c.strip()
        for c in raw_clauses
        if c and not _SPLIT_PATTERN.match(c.strip()) and len(c.strip()) > 8
    ]

    if not clauses:
        # Fallback: treat the whole prompt as one task
        clauses = [prompt.strip()]

    return [classify(c) for c in clauses]
Enter fullscreen mode Exit fullscreen mode

T-Shirt Sizing and Feature Classification

This logic still remains: once separated, each distinct sub-task passes through the TaskClassifier. The classifier looks for domain markers (e.g., security policies, coding constraints, or structural documentation formatting) and assigns a maximum required model complexity tier: SMALL, MEDIUM, or LARGE.

"""
task_classifier.py
Classifies a text prompt into a TaskProfile: type, complexity score, and token estimate.
"""

from __future__ import annotations
import math
import re
from dataclasses import dataclass, field
from enum import Enum


class TaskType(Enum):
    DOCUMENTATION   = "documentation"    # READMEs, API docs, inline comments
    CODE_GENERATION = "code_generation"  # new functions / modules / classes
    CODE_REVIEW     = "code_review"      # diff analysis, security audit
    REASONING       = "reasoning"        # architecture, design decisions
    QA_SIMPLE       = "qa_simple"        # factual lookups, definitions
    SUMMARISATION   = "summarisation"    # condense / shorten text


class ComplexityTier(Enum):
    LOW    = 1   # score 0.00 – 0.34
    MEDIUM = 2   # score 0.35 – 0.64
    HIGH   = 3   # score 0.65 – 1.00


@dataclass
class TaskProfile:
    raw_text:       str
    task_type:      TaskType
    complexity:     float          # 0.0 β†’ 1.0
    tier:           ComplexityTier
    token_estimate: int
    sub_tasks:      list["TaskProfile"] = field(default_factory=list)

    def __repr__(self) -> str:
        return (
            f"TaskProfile(type={self.task_type.value!r}, "
            f"complexity={self.complexity}, tier={self.tier.name}, "
            f"tokensβ‰ˆ{self.token_estimate})"
        )


# ── Keyword signal table: (keywords, base_complexity_score) ───────────────────
_SIGNALS: dict[TaskType, tuple[list[str], float]] = {
    TaskType.DOCUMENTATION:   (
        ["write the api", "api reference", "write docs", "write documentation",
         "document", "readme", "docstring", "comment",
         "explain", "describe"], 0.20),
    TaskType.CODE_GENERATION: (
        ["implement", "create function", "write code", "build",
         "develop", "middleware", "endpoint", "class", "module"], 0.55),
    TaskType.CODE_REVIEW:     (
        ["review", "audit", "security", "vulnerability",
         "diff", "check", "analyse code", "analyze code"], 0.60),
    TaskType.REASONING:       (
        ["architecture", "design", "trade-off", "compare",
         "evaluate", "pros and cons", "should i", "best approach"], 0.80),
    TaskType.QA_SIMPLE:       (
        ["what is", "how does", "define", "tell me", "when was"], 0.10),
    TaskType.SUMMARISATION:   (
        ["summarise", "summarize", "tldr", "shorten",
         "brief", "condense", "overview"], 0.15),
}

# Multi-word phrases that force a specific TaskType regardless of keyword scoring.
_PHRASE_OVERRIDES: list[tuple[str, TaskType]] = [
    ("api reference",       TaskType.DOCUMENTATION),
    ("write the api",       TaskType.DOCUMENTATION),
    ("write docs",          TaskType.DOCUMENTATION),
    ("write documentation", TaskType.DOCUMENTATION),
    ("code review",         TaskType.CODE_REVIEW),
    ("security audit",      TaskType.CODE_REVIEW),
]


def _token_estimate(text: str) -> int:
    """Rough GPT-style tokenisation: ~4 chars per token."""
    return max(1, len(text) // 4)


def _depth_penalty(text: str) -> float:
    """
    Extra score for multi-step / conditional reasoning markers.
    Each hit adds 0.04, capped at 0.20.
    """
    markers = [
        "step 1", "first,", "then,", "finally,",
        "however,", "alternatively,", "trade-off",
        "on the other hand", "in addition", "furthermore",
    ]
    lower = text.lower()
    hits = sum(1 for m in markers if m in lower)
    return min(0.20, hits * 0.04)


def classify(text: str) -> TaskProfile:
    """Return a TaskProfile for the given text snippet."""
    best_type, base_score = TaskType.QA_SIMPLE, 0.10
    lower = text.lower()

    # Phase 1: phrase-level overrides (highest priority)
    for phrase, forced_type in _PHRASE_OVERRIDES:
        if phrase in lower:
            best_type = forced_type
            base_score = dict(_SIGNALS)[forced_type][1]
            break
    else:
        # Phase 2: single-keyword scoring (only if no phrase matched)
        for ttype, (keywords, score) in _SIGNALS.items():
            if any(k in lower for k in keywords):
                if score > base_score:
                    best_type, base_score = ttype, score

    tokens       = _token_estimate(text)
    token_factor = min(0.15, math.log10(max(1, tokens)) * 0.05)
    depth        = _depth_penalty(text)
    final_score  = min(1.0, base_score + token_factor + depth)

    tier = (ComplexityTier.LOW    if final_score < 0.35 else
            ComplexityTier.MEDIUM if final_score < 0.65 else
            ComplexityTier.HIGH)

    return TaskProfile(
        raw_text=text,
        task_type=best_type,
        complexity=round(final_score, 3),
        tier=tier,
        token_estimate=tokens,
    )
Enter fullscreen mode Exit fullscreen mode

Unified Client Abstraction via litellm.Router

Rather than spawning raw, unmonitored HTTP connections to different endpoints, all asynchronous I/O travels through a centralized litellm.Router instance. The initializion of the router is done by passing a structured configuration list, assigning our custom symbolic tiers directly to our local Ollama models.

# From src/llm_client.py
async def call_model(
    model_label: str,
    prompt: str,
    max_tokens: int = 1024,
) -> tuple[str, float, int]:
    """
    Call the LiteLLM AI Gateway Router for the given symbolic model label.
    Normalizes response formats, handles retries, and tracks generation token metrics.
    """
    messages = [{"role": "user", "content": prompt}]

    t0 = time.perf_counter()
    try:
        response = await _router.acompletion(
            model=model_label,
            messages=messages,
            max_tokens=max_tokens,
            temperature=0.3,
        )
        latency = time.perf_counter() - t0

        text   = response.choices[0].message.content or ""
        tokens = (
            response.usage.completion_tokens
            if response.usage
            else max(1, len(text) // 4)
        )
        return text, latency, tokens

    except Exception as exc:
        latency = time.perf_counter() - t0
        real_model = _TIER_MODELS.get(model_label, model_label)
        return (
            f"[LiteLLM ERROR β€” {real_model}: {exc}]",
            latency,
            0,
        )
Enter fullscreen mode Exit fullscreen mode

High-Resilience Execution Block

When a task is scheduled for execution, the pipeline invokes a uniform acompletion() call. LiteLLM dynamically normalizes response parsing across different endpoints, allowing us to safely extract generation data using a single, unified interface regardless of the model provider.

# From src/llm_client.py
async def call_model(
    model_label: str,
    prompt: str,
    max_tokens: int = 1024,
) -> tuple[str, float, int]:
    """
    Call the LiteLLM AI Gateway Router for the given symbolic model label.
    Normalizes response formats, handles retries, and tracks generation token metrics.
    """
    messages = [{"role": "user", "content": prompt}]

    t0 = time.perf_counter()
    try:
        response = await _router.acompletion(
            model=model_label,
            messages=messages,
            max_tokens=max_tokens,
            temperature=0.3,
        )
        latency = time.perf_counter() - t0

        text   = response.choices[0].message.content or ""
        tokens = (
            response.usage.completion_tokens
            if response.usage
            else max(1, len(text) // 4)
        )
        return text, latency, tokens

    except Exception as exc:
        latency = time.perf_counter() - t0
        real_model = _TIER_MODELS.get(model_label, model_label)
        return (
            f"[LiteLLM ERROR β€” {real_model}: {exc}]",
            latency,
            0,
        )
Enter fullscreen mode Exit fullscreen mode

Parallel Workload Orchestration

To prevent independent model generations from bottlenecking each other, the pipeline schedules execution asynchronously. The orchestrator uses asyncio.gather to trigger all routed sub-tasks concurrently across the LiteLLM transport layer, making the overall latency bound only by the slowest single engine in the pool.

"""
orchestrator.py
Ties together splitting, routing, parallel execution, and aggregation.
Uses the LiteLLM Router (via llm_client.py) for all LLM calls.
"""

from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass

from src.splitter import split_and_classify
from src.router import dispatch
from src.llm_client import call_model
from src.feedback import FeedbackStore
from src.task_classifier import TaskProfile
from src.cost_registry import ModelSpec


@dataclass
class SubTaskResult:
    profile:     TaskProfile
    model:       ModelSpec
    response:    str
    latency_ms:  int
    out_tokens:  int


@dataclass
class OrchestratorResult:
    prompt:       str
    sub_results:  list[SubTaskResult]
    merged:       str
    total_ms:     int

    def routing_table(self) -> list[dict]:
        return [
            {
                "task_type":   r.profile.task_type.value,
                "complexity":  r.profile.complexity,
                "tier":        r.profile.tier.name,
                "model":       r.model.label,
                "latency_ms":  r.latency_ms,
                "out_tokens":  r.out_tokens,
                "cost_units":  round(r.model.cost_per_1k * r.out_tokens / 1000, 6),
            }
            for r in self.sub_results
        ]


class Orchestrator:
    def __init__(self, feedback_store: FeedbackStore | None = None) -> None:
        self._store = feedback_store or FeedbackStore()

    # ── Core ─────────────────────────────────────────────────────────────────

    async def handle(self, prompt: str) -> OrchestratorResult:
        """
        Full pipeline:
          1. Split prompt into independent sub-tasks.
          2. Classify each sub-task (type + complexity).
          3. Route each sub-task to the optimal model tier via utility function.
          4. Execute all sub-tasks in parallel via LiteLLM Router.
          5. Aggregate responses.
          6. Record to feedback store.
        """
        t_start = time.perf_counter()

        # Step 1 + 2: split & classify
        profiles = split_and_classify(prompt)

        # Step 3: route (utility-based selection)
        routing = dispatch(profiles)   # key β†’ (profile, model)

        # Step 4: parallel execution through LiteLLM Router
        async def _exec(
            key: str, profile: TaskProfile, model: ModelSpec
        ) -> SubTaskResult:
            response, latency, tokens = await call_model(
                model_label=model.label,
                prompt=profile.raw_text,
                max_tokens=min(2048, profile.token_estimate * 3),
            )
            lat_ms = int(latency * 1000)
            self._store.record(
                task_type=profile.task_type.value,
                complexity=profile.complexity,
                tier=model.tier,
                model_used=model.label,
                latency_ms=lat_ms,
                output_tokens=tokens,
                cost_per_1k=model.cost_per_1k,
                prompt_len=len(profile.raw_text),
            )
            return SubTaskResult(profile, model, response, lat_ms, tokens)

        sub_results = await asyncio.gather(
            *[_exec(k, p, m) for k, (p, m) in routing.items()]
        )

        # Step 5: aggregate
        merged   = self._merge(list(sub_results))
        total_ms = int((time.perf_counter() - t_start) * 1000)

        return OrchestratorResult(
            prompt=prompt,
            sub_results=list(sub_results),
            merged=merged,
            total_ms=total_ms,
        )

    def handle_sync(self, prompt: str) -> OrchestratorResult:
        return asyncio.run(self.handle(prompt))

    # ── Merge ────────────────────────────────────────────────────────────────

    @staticmethod
    def _merge(results: list[SubTaskResult]) -> str:
        sections = []
        for r in results:
            label = r.profile.task_type.value.replace("_", " ").title()
            header = (
                f"## {label}  "
                f"_(via {r.model.label} [{r.model.provider}], {r.latency_ms} ms)_"
            )
            sections.append(f"{header}\n\n{r.response.strip()}")
        return "\n\n---\n\n".join(sections)
Enter fullscreen mode Exit fullscreen mode

Enhancing Enterprise Resilience

Moving execution mechanics to LiteLLM provides several critical operational advantages out of the box:

  • Transparent Fallback Chains: If a local Ollama container gets saturated or crashes, LiteLLM can immediately catch the exception and redirect the payload to a backup cloud endpoint (such as IBM watsonx or an external API provider) without changing a line of your application code.
  • Built-in Error Handling: It encapsulates transient issues such as HTTP timeouts or context boundary overruns, implementing an exponential back-off strategy rather than failing the client request immediately.
  • Uniform Observability Footprint: Because every sub-task traverses a single router gateway, logging input/output token usage, collecting latency metrics, and auditing security telemetry becomes completely centralized inside our FeedbackStore.

Compared to the plain Ollama version:


Running and benchmarking the solution

To evaluate the routing engine in real time, simply execute the provided terminal scenario;

./multi-llm-router-LiteLLM/scripts/demo.sh
[demo] Verifying dependencies (litellm, fastapi, uvicorn)…



β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Multi-LLM Task Router β€” LiteLLM Edition  demo               β”‚
β”‚  Gateway: LiteLLM AI Router β†’ Ollama (localhost:11434)       β”‚
β”‚                                                              β”‚
β”‚  Tip: set DRY_RUN=1 to inspect routing without LLM calls     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

================================================================================
  Multi-LLM Task Router  [LiteLLM Edition]  β€”  Demonstration Scenario
================================================================================

Prompt:
  Write the API reference documentation for our REST /users endpoints AND implement the OAuth2 Bearer-token middleware in Python FastAPI.

LiteLLM Gateway β†’ Ollama at: http://localhost:11434

Model tier mapping (LiteLLM alias β†’ Ollama model):
  model::light          (IBM Granite   )  β†’  ollama/granite4.1:3b
  model::medium         (Meta LLaMA    )  β†’  ollama/llama3.2:latest
  model::balanced       (Google Gemma  )  β†’  ollama/gemma3:4b
  model::heavy          (Mistral AI    )  β†’  ollama/mistral-small3.2:latest

Detected 2 sub-task(s):
  [1] TaskProfile(type='documentation', complexity=0.26, tier=LOW, tokensβ‰ˆ16)
  [2] TaskProfile(type='code_generation', complexity=0.609, tier=MEDIUM, tokensβ‰ˆ15)

Routing decisions:
  Task Type              Complexity  Tier       Label                Provider              U
  ------------------------------------------------------------------------------
  documentation               0.260  LOW        model::light         IBM Granite     +0.3616
  code_generation             0.609  MEDIUM     model::balanced      Google Gemma    +0.3718

Executing sub-tasks via LiteLLM β†’ Ollama…  (set DRY_RUN=1 to skip)


================================================================================
  ROUTING TABLE  (via LiteLLM AI Gateway)
================================================================================
  Task Type              Label                Provider         Latency   Tokens        Cost
  ------------------------------------------------------------------------------
  documentation          model::light         IBM Granite       2720 ms       48    0.002400
  code_generation        model::balanced      Google Gemma      5595 ms       45    0.015750

  Total wall-clock: 5601 ms

======================================================================
  MERGED RESPONSE
======================================================================
## Documentation  _(via model::light [IBM Granite], 2720 ms)_

# API Reference: `/users` Endpoints

Our RESTful API provides a set of endpoints under the `/users` resource to manage user-related operations. Below is the detailed documentation for each endpoint, including request methods, expected input parameters,

---

## Code Generation  _(via model::balanced [Google Gemma], 5595 ms)_

`python
from fastapi import Depends, HTTPException, status
from fastapi.security import CredentialsTokenAuthentication
import jwt  # You'll need to install this: pip install PyJWT


async def authenticate(token: str

======================================================================
  FEEDBACK SUMMARY
======================================================================
{
  "total_calls": 2,
  "total_cost": 0.01815,
  "avg_quality": 0.714,
  "calls_by_tier": {
    "1": 1,
    "3": 1
  }
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

By wrapping our budget-conscious routing logic in LiteLLM, we have constructed an AI orchestration layer that balances economic efficiency with enterprise reliability. We no longer write custom code to handle network hiccups or provider differences; instead, we let our core pipeline focus entirely on making optimal routing decisions.

🎯 Et voilΓ , that’s a wrap πŸ’― for part 2 and thanks for reading!

Stay tuned for the next episode πŸ“Ί

Links

Top comments (0)