Part 2: A theorical implementation of LLM selection and orchestration using the open source LiteLLM
Introduction
In the first part of this series, we explored the theoretical foundation of intelligent orchestration (implemented by Bob :) ) β demonstrating how to dynamically route tasks to the most performant and cost-effective Large Language Model (LLM) based on context and complexity, and why this precise capability stands out as one of the definitive differentiators of IBM Bob. Now, itβs time to move from theory to production-grade architecture. In this second installment, we will implement this exact cost-routing logic using a powerful, production-ready open-source solution: LiteLLM with a unified abstraction layer, making a routing engine completely plug-and-play.
> π¨ Disclaimer: This article is for educational purposes only. It provides a conceptual, hands-on look at how LLM routing logic (here with LiteLLM) operates and should not be mistaken for the official enterprise implementation or architecture of the IBM Bob product.
TL;DR-What is LiteLLM?
Image from LiteLLM Site
Excerpt of LiteLLM Github Repository
LiteLLM is an open source AI Gateway that gives you a single, unified interface to call 100+ LLM providers β OpenAI, Anthropic, Gemini, Bedrock, Azure, and more β using the OpenAI format.
Use it as a Python SDK for direct library integration, or deploy the AI Gateway (Proxy Server) as a centralized service for your team or organization.
Why LiteLLM
Managing LLM calls across providers gets complicated fast β different SDKs, auth patterns, request formats, and error types for every model. LiteLLM removes that friction:
Unified API β one interface for 100+ LLMs, no provider-specific SDK juggling
Drop-in OpenAI compatibility β swap providers without rewriting your code
Production-ready gateway β virtual keys, spend tracking, guardrails, load balancing, and an admin dashboard out of the box
8ms P95 latency at 1k RPS (benchmarks)
Multi-LLM Routing β Part 2: Implementation with LiteLLM
In the first part of this series, we saw the theoretical foundation of intelligent orchestration β demonstrating how to dynamically decompose a complex user prompt and route sub-tasks to the most performant and cost-effective model tier with an ad-hoc code.
In this post, we transition the concept of cost-routing pipeline into a scalable solution by integrating a powerful open-source solution: LiteLLM. By leveraging LiteLLM as a unified transport layer over our local Ollama instances, we decouple which tier is chosen from how the call is executed.
The Production-Grade Gateway Architecture
By integrating LiteLLM, the custom mathematical utility scoring while completely abstracting the underlying infrastructure transport remains. The orchestrator no longer addresses specific container endpoints directly; instead, it interfaces with a standardized, OpenAI-compatible abstraction layer.
Under this architecture, responsibility is split into two cleanly isolated layers:
-
The Custom Routing Pipeline (
router.py): Handles the business intelligence. It scores task complexity and calculates which symbolic tier (model::light,model::medium, etc.) maximizes performance while minimizing token cost using our standard utility function:
-
The LiteLLM AI Gateway (
llm_client.py): Handles transport resilience. It maps those symbolic tier aliases to concrete model weights and manages enterprise-grade networking features such as automatic retries, back-off parameters, fallback chains, and transient error cooling.
Letβs dig into that πͺ
Core Technical Implementation
Granular Task Decomposition
This part remains unchanged from the first version: to avoid processing unnecessary tokens through high-tier engines, the architecture relies on a specialized TaskSplitter. This component evaluates the raw user payload and breaks it down into discrete components so that documentation and code can travel along separate routing paths.
"""
splitter.py
Splits a compound prompt into independent clauses, then classifies each one.
"""
from __future__ import annotations
import re
from src.task_classifier import classify, TaskProfile
# Conjunctions that signal independent sub-tasks within a single prompt
_SPLIT_PATTERN = re.compile(
r"(?i)\s*\b(and also|and then|additionally|plus|as well as|AND|also)\b\s*",
)
def split_and_classify(prompt: str) -> list[TaskProfile]:
"""
1. Split the prompt on conjunction markers.
2. Discard separator tokens.
3. Classify each remaining clause.
"""
raw_clauses = _SPLIT_PATTERN.split(prompt)
# Keep only non-separator, non-empty parts
clauses = [
c.strip()
for c in raw_clauses
if c and not _SPLIT_PATTERN.match(c.strip()) and len(c.strip()) > 8
]
if not clauses:
# Fallback: treat the whole prompt as one task
clauses = [prompt.strip()]
return [classify(c) for c in clauses]
T-Shirt Sizing and Feature Classification
This logic still remains: once separated, each distinct sub-task passes through the TaskClassifier. The classifier looks for domain markers (e.g., security policies, coding constraints, or structural documentation formatting) and assigns a maximum required model complexity tier: SMALL, MEDIUM, or LARGE.
"""
task_classifier.py
Classifies a text prompt into a TaskProfile: type, complexity score, and token estimate.
"""
from __future__ import annotations
import math
import re
from dataclasses import dataclass, field
from enum import Enum
class TaskType(Enum):
DOCUMENTATION = "documentation" # READMEs, API docs, inline comments
CODE_GENERATION = "code_generation" # new functions / modules / classes
CODE_REVIEW = "code_review" # diff analysis, security audit
REASONING = "reasoning" # architecture, design decisions
QA_SIMPLE = "qa_simple" # factual lookups, definitions
SUMMARISATION = "summarisation" # condense / shorten text
class ComplexityTier(Enum):
LOW = 1 # score 0.00 β 0.34
MEDIUM = 2 # score 0.35 β 0.64
HIGH = 3 # score 0.65 β 1.00
@dataclass
class TaskProfile:
raw_text: str
task_type: TaskType
complexity: float # 0.0 β 1.0
tier: ComplexityTier
token_estimate: int
sub_tasks: list["TaskProfile"] = field(default_factory=list)
def __repr__(self) -> str:
return (
f"TaskProfile(type={self.task_type.value!r}, "
f"complexity={self.complexity}, tier={self.tier.name}, "
f"tokensβ{self.token_estimate})"
)
# ββ Keyword signal table: (keywords, base_complexity_score) βββββββββββββββββββ
_SIGNALS: dict[TaskType, tuple[list[str], float]] = {
TaskType.DOCUMENTATION: (
["write the api", "api reference", "write docs", "write documentation",
"document", "readme", "docstring", "comment",
"explain", "describe"], 0.20),
TaskType.CODE_GENERATION: (
["implement", "create function", "write code", "build",
"develop", "middleware", "endpoint", "class", "module"], 0.55),
TaskType.CODE_REVIEW: (
["review", "audit", "security", "vulnerability",
"diff", "check", "analyse code", "analyze code"], 0.60),
TaskType.REASONING: (
["architecture", "design", "trade-off", "compare",
"evaluate", "pros and cons", "should i", "best approach"], 0.80),
TaskType.QA_SIMPLE: (
["what is", "how does", "define", "tell me", "when was"], 0.10),
TaskType.SUMMARISATION: (
["summarise", "summarize", "tldr", "shorten",
"brief", "condense", "overview"], 0.15),
}
# Multi-word phrases that force a specific TaskType regardless of keyword scoring.
_PHRASE_OVERRIDES: list[tuple[str, TaskType]] = [
("api reference", TaskType.DOCUMENTATION),
("write the api", TaskType.DOCUMENTATION),
("write docs", TaskType.DOCUMENTATION),
("write documentation", TaskType.DOCUMENTATION),
("code review", TaskType.CODE_REVIEW),
("security audit", TaskType.CODE_REVIEW),
]
def _token_estimate(text: str) -> int:
"""Rough GPT-style tokenisation: ~4 chars per token."""
return max(1, len(text) // 4)
def _depth_penalty(text: str) -> float:
"""
Extra score for multi-step / conditional reasoning markers.
Each hit adds 0.04, capped at 0.20.
"""
markers = [
"step 1", "first,", "then,", "finally,",
"however,", "alternatively,", "trade-off",
"on the other hand", "in addition", "furthermore",
]
lower = text.lower()
hits = sum(1 for m in markers if m in lower)
return min(0.20, hits * 0.04)
def classify(text: str) -> TaskProfile:
"""Return a TaskProfile for the given text snippet."""
best_type, base_score = TaskType.QA_SIMPLE, 0.10
lower = text.lower()
# Phase 1: phrase-level overrides (highest priority)
for phrase, forced_type in _PHRASE_OVERRIDES:
if phrase in lower:
best_type = forced_type
base_score = dict(_SIGNALS)[forced_type][1]
break
else:
# Phase 2: single-keyword scoring (only if no phrase matched)
for ttype, (keywords, score) in _SIGNALS.items():
if any(k in lower for k in keywords):
if score > base_score:
best_type, base_score = ttype, score
tokens = _token_estimate(text)
token_factor = min(0.15, math.log10(max(1, tokens)) * 0.05)
depth = _depth_penalty(text)
final_score = min(1.0, base_score + token_factor + depth)
tier = (ComplexityTier.LOW if final_score < 0.35 else
ComplexityTier.MEDIUM if final_score < 0.65 else
ComplexityTier.HIGH)
return TaskProfile(
raw_text=text,
task_type=best_type,
complexity=round(final_score, 3),
tier=tier,
token_estimate=tokens,
)
Unified Client Abstraction via litellm.Router
Rather than spawning raw, unmonitored HTTP connections to different endpoints, all asynchronous I/O travels through a centralized litellm.Router instance. The initializion of the router is done by passing a structured configuration list, assigning our custom symbolic tiers directly to our local Ollama models.
# From src/llm_client.py
async def call_model(
model_label: str,
prompt: str,
max_tokens: int = 1024,
) -> tuple[str, float, int]:
"""
Call the LiteLLM AI Gateway Router for the given symbolic model label.
Normalizes response formats, handles retries, and tracks generation token metrics.
"""
messages = [{"role": "user", "content": prompt}]
t0 = time.perf_counter()
try:
response = await _router.acompletion(
model=model_label,
messages=messages,
max_tokens=max_tokens,
temperature=0.3,
)
latency = time.perf_counter() - t0
text = response.choices[0].message.content or ""
tokens = (
response.usage.completion_tokens
if response.usage
else max(1, len(text) // 4)
)
return text, latency, tokens
except Exception as exc:
latency = time.perf_counter() - t0
real_model = _TIER_MODELS.get(model_label, model_label)
return (
f"[LiteLLM ERROR β {real_model}: {exc}]",
latency,
0,
)
High-Resilience Execution Block
When a task is scheduled for execution, the pipeline invokes a uniform acompletion() call. LiteLLM dynamically normalizes response parsing across different endpoints, allowing us to safely extract generation data using a single, unified interface regardless of the model provider.
# From src/llm_client.py
async def call_model(
model_label: str,
prompt: str,
max_tokens: int = 1024,
) -> tuple[str, float, int]:
"""
Call the LiteLLM AI Gateway Router for the given symbolic model label.
Normalizes response formats, handles retries, and tracks generation token metrics.
"""
messages = [{"role": "user", "content": prompt}]
t0 = time.perf_counter()
try:
response = await _router.acompletion(
model=model_label,
messages=messages,
max_tokens=max_tokens,
temperature=0.3,
)
latency = time.perf_counter() - t0
text = response.choices[0].message.content or ""
tokens = (
response.usage.completion_tokens
if response.usage
else max(1, len(text) // 4)
)
return text, latency, tokens
except Exception as exc:
latency = time.perf_counter() - t0
real_model = _TIER_MODELS.get(model_label, model_label)
return (
f"[LiteLLM ERROR β {real_model}: {exc}]",
latency,
0,
)
Parallel Workload Orchestration
To prevent independent model generations from bottlenecking each other, the pipeline schedules execution asynchronously. The orchestrator uses asyncio.gather to trigger all routed sub-tasks concurrently across the LiteLLM transport layer, making the overall latency bound only by the slowest single engine in the pool.
"""
orchestrator.py
Ties together splitting, routing, parallel execution, and aggregation.
Uses the LiteLLM Router (via llm_client.py) for all LLM calls.
"""
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass
from src.splitter import split_and_classify
from src.router import dispatch
from src.llm_client import call_model
from src.feedback import FeedbackStore
from src.task_classifier import TaskProfile
from src.cost_registry import ModelSpec
@dataclass
class SubTaskResult:
profile: TaskProfile
model: ModelSpec
response: str
latency_ms: int
out_tokens: int
@dataclass
class OrchestratorResult:
prompt: str
sub_results: list[SubTaskResult]
merged: str
total_ms: int
def routing_table(self) -> list[dict]:
return [
{
"task_type": r.profile.task_type.value,
"complexity": r.profile.complexity,
"tier": r.profile.tier.name,
"model": r.model.label,
"latency_ms": r.latency_ms,
"out_tokens": r.out_tokens,
"cost_units": round(r.model.cost_per_1k * r.out_tokens / 1000, 6),
}
for r in self.sub_results
]
class Orchestrator:
def __init__(self, feedback_store: FeedbackStore | None = None) -> None:
self._store = feedback_store or FeedbackStore()
# ββ Core βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def handle(self, prompt: str) -> OrchestratorResult:
"""
Full pipeline:
1. Split prompt into independent sub-tasks.
2. Classify each sub-task (type + complexity).
3. Route each sub-task to the optimal model tier via utility function.
4. Execute all sub-tasks in parallel via LiteLLM Router.
5. Aggregate responses.
6. Record to feedback store.
"""
t_start = time.perf_counter()
# Step 1 + 2: split & classify
profiles = split_and_classify(prompt)
# Step 3: route (utility-based selection)
routing = dispatch(profiles) # key β (profile, model)
# Step 4: parallel execution through LiteLLM Router
async def _exec(
key: str, profile: TaskProfile, model: ModelSpec
) -> SubTaskResult:
response, latency, tokens = await call_model(
model_label=model.label,
prompt=profile.raw_text,
max_tokens=min(2048, profile.token_estimate * 3),
)
lat_ms = int(latency * 1000)
self._store.record(
task_type=profile.task_type.value,
complexity=profile.complexity,
tier=model.tier,
model_used=model.label,
latency_ms=lat_ms,
output_tokens=tokens,
cost_per_1k=model.cost_per_1k,
prompt_len=len(profile.raw_text),
)
return SubTaskResult(profile, model, response, lat_ms, tokens)
sub_results = await asyncio.gather(
*[_exec(k, p, m) for k, (p, m) in routing.items()]
)
# Step 5: aggregate
merged = self._merge(list(sub_results))
total_ms = int((time.perf_counter() - t_start) * 1000)
return OrchestratorResult(
prompt=prompt,
sub_results=list(sub_results),
merged=merged,
total_ms=total_ms,
)
def handle_sync(self, prompt: str) -> OrchestratorResult:
return asyncio.run(self.handle(prompt))
# ββ Merge ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@staticmethod
def _merge(results: list[SubTaskResult]) -> str:
sections = []
for r in results:
label = r.profile.task_type.value.replace("_", " ").title()
header = (
f"## {label} "
f"_(via {r.model.label} [{r.model.provider}], {r.latency_ms} ms)_"
)
sections.append(f"{header}\n\n{r.response.strip()}")
return "\n\n---\n\n".join(sections)
Enhancing Enterprise Resilience
Moving execution mechanics to LiteLLM provides several critical operational advantages out of the box:
- Transparent Fallback Chains: If a local Ollama container gets saturated or crashes, LiteLLM can immediately catch the exception and redirect the payload to a backup cloud endpoint (such as IBM watsonx or an external API provider) without changing a line of your application code.
- Built-in Error Handling: It encapsulates transient issues such as HTTP timeouts or context boundary overruns, implementing an exponential back-off strategy rather than failing the client request immediately.
-
Uniform Observability Footprint: Because every sub-task traverses a single router gateway, logging input/output token usage, collecting latency metrics, and auditing security telemetry becomes completely centralized inside our
FeedbackStore.
Compared to the plain Ollama version:
Running and benchmarking the solution
To evaluate the routing engine in real time, simply execute the provided terminal scenario;
./multi-llm-router-LiteLLM/scripts/demo.sh
[demo] Verifying dependencies (litellm, fastapi, uvicorn)β¦
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Multi-LLM Task Router β LiteLLM Edition demo β
β Gateway: LiteLLM AI Router β Ollama (localhost:11434) β
β β
β Tip: set DRY_RUN=1 to inspect routing without LLM calls β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
================================================================================
Multi-LLM Task Router [LiteLLM Edition] β Demonstration Scenario
================================================================================
Prompt:
Write the API reference documentation for our REST /users endpoints AND implement the OAuth2 Bearer-token middleware in Python FastAPI.
LiteLLM Gateway β Ollama at: http://localhost:11434
Model tier mapping (LiteLLM alias β Ollama model):
model::light (IBM Granite ) β ollama/granite4.1:3b
model::medium (Meta LLaMA ) β ollama/llama3.2:latest
model::balanced (Google Gemma ) β ollama/gemma3:4b
model::heavy (Mistral AI ) β ollama/mistral-small3.2:latest
Detected 2 sub-task(s):
[1] TaskProfile(type='documentation', complexity=0.26, tier=LOW, tokensβ16)
[2] TaskProfile(type='code_generation', complexity=0.609, tier=MEDIUM, tokensβ15)
Routing decisions:
Task Type Complexity Tier Label Provider U
------------------------------------------------------------------------------
documentation 0.260 LOW model::light IBM Granite +0.3616
code_generation 0.609 MEDIUM model::balanced Google Gemma +0.3718
Executing sub-tasks via LiteLLM β Ollamaβ¦ (set DRY_RUN=1 to skip)
================================================================================
ROUTING TABLE (via LiteLLM AI Gateway)
================================================================================
Task Type Label Provider Latency Tokens Cost
------------------------------------------------------------------------------
documentation model::light IBM Granite 2720 ms 48 0.002400
code_generation model::balanced Google Gemma 5595 ms 45 0.015750
Total wall-clock: 5601 ms
======================================================================
MERGED RESPONSE
======================================================================
## Documentation _(via model::light [IBM Granite], 2720 ms)_
# API Reference: `/users` Endpoints
Our RESTful API provides a set of endpoints under the `/users` resource to manage user-related operations. Below is the detailed documentation for each endpoint, including request methods, expected input parameters,
---
## Code Generation _(via model::balanced [Google Gemma], 5595 ms)_
`python
from fastapi import Depends, HTTPException, status
from fastapi.security import CredentialsTokenAuthentication
import jwt # You'll need to install this: pip install PyJWT
async def authenticate(token: str
======================================================================
FEEDBACK SUMMARY
======================================================================
{
"total_calls": 2,
"total_cost": 0.01815,
"avg_quality": 0.714,
"calls_by_tier": {
"1": 1,
"3": 1
}
}
Conclusion
By wrapping our budget-conscious routing logic in LiteLLM, we have constructed an AI orchestration layer that balances economic efficiency with enterprise reliability. We no longer write custom code to handle network hiccups or provider differences; instead, we let our core pipeline focus entirely on making optimal routing decisions.
π― Et voilΓ , thatβs a wrap π― for part 2 and thanks for reading!
Stay tuned for the next episode πΊ
Links
- LiteLLM: https://www.litellm.ai/
- LiteLLM Github repository: https://github.com/BerriAI/litellm
- Github repository for this post: https://github.com/aairom/multillm-taskrouting/tree/main/multi-llm-router-LiteLLM
- IBM Bob: https://bob.ibm.com/






Top comments (0)