Using one LLM for everything is like using a chainsaw to cut butter. It works, but you're overpaying massively.
Model routing is the practice of automatically directing each AI request to the most cost-effective model that can handle it. Complex reasoning goes to Claude Opus. Simple edits go to DeepSeek. Structured extraction goes to GPT.
Here's how to build it.
The Cost Problem
A typical AI coding pipeline without routing:
All requests → Claude Sonnet 4.6 → $3/$15 per 1M tokens
200 requests/day × 75K tokens/request = 15M tokens/day
Daily cost: ~$135
Monthly cost: ~$4,050
The same pipeline with routing:
Complex (20%) → Claude Opus: $5/$25 per 1M tokens
Standard (30%) → Claude Sonnet: $3/$15 per 1M tokens
Structured (15%) → GPT-5.5: $3/$12 per 1M tokens
Bulk (35%) → DeepSeek V3: $0.27/$1.10 per 1M tokens
Monthly cost: ~$1,200
Savings: 70%
Architecture
┌─────────────┐
│ Request │
└─────┬───────┘
│
┌─────▼───────┐
│ Classifier │ ← Rule-based or ML
└─────┬───────┘
│
┌───┼───┬───────┐
│ │ │ │
▼ ▼ ▼ ▼
Opus Son. GPT DeepSeek
│ │ │ │
└───┼───┴───────┘
│
┌─────▼───────┐
│ Fallback │ ← Retry on failure
└─────┬───────┘
│
┌─────▼───────┐
│ Response │
└─────────────┘
Implementation
The Router
from openai import OpenAI
from enum import Enum
class ModelTier(Enum):
COMPLEX = "complex"
STANDARD = "standard"
STRUCTURED = "structured"
BULK = "bulk"
MODEL_MAP = {
ModelTier.COMPLEX: "claude-opus-4-7",
ModelTier.STANDARD: "claude-sonnet-4-6",
ModelTier.STRUCTURED: "gpt-5.5",
ModelTier.BULK: "deepseek-chat",
}
FALLBACK_MAP = {
"claude-opus-4-7": "claude-sonnet-4-6",
"claude-sonnet-4-6": "gpt-5.5",
"gpt-5.5": "claude-sonnet-4-6",
"deepseek-chat": "gpt-5.5",
}
client = OpenAI(
base_url="https://api.futurmix.ai/v1",
api_key="your-key"
)
The Classifier
import re
def classify_request(prompt: str, metadata: dict = None) -> ModelTier:
"""Classify a request to determine the optimal model tier."""
prompt_lower = prompt.lower()
tokens = prompt.split()
word_count = len(tokens)
# Check metadata hints first
if metadata:
if metadata.get("tier"):
return ModelTier(metadata["tier"])
if metadata.get("json_output"):
return ModelTier.STRUCTURED
# Structured output detection
structured_signals = [
"json", "csv", "xml", "schema", "extract",
"parse", "format as", "return as", "structured"
]
if any(s in prompt_lower for s in structured_signals):
return ModelTier.STRUCTURED
# Complex task detection
complex_signals = [
"refactor", "architect", "design system", "debug",
"race condition", "security audit", "performance optimize",
"explain the trade-offs", "compare approaches",
"root cause", "memory leak", "deadlock"
]
if any(s in prompt_lower for s in complex_signals):
return ModelTier.COMPLEX
# Also complex: very long prompts with code context
if word_count > 1000:
return ModelTier.COMPLEX
# Bulk task detection
bulk_signals = [
"generate tests", "add docstrings", "translate all",
"add comments", "rename variable", "format code",
"boilerplate", "template", "placeholder", "stub"
]
if any(s in prompt_lower for s in bulk_signals):
return ModelTier.BULK
# Default: standard
return ModelTier.STANDARD
The Execution Layer
def route_and_execute(
prompt: str,
system_prompt: str = None,
metadata: dict = None,
max_retries: int = 2
) -> dict:
"""Route request to optimal model and execute with fallback."""
tier = classify_request(prompt, metadata)
model = MODEL_MAP[tier]
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
current_model = model
for attempt in range(max_retries + 1):
try:
response = client.chat.completions.create(
model=current_model,
messages=messages,
max_tokens=4096
)
return {
"content": response.choices[0].message.content,
"model_used": current_model,
"tier": tier.value,
"attempt": attempt + 1,
"usage": {
"input": response.usage.prompt_tokens,
"output": response.usage.completion_tokens
}
}
except Exception as e:
fallback = FALLBACK_MAP.get(current_model)
if fallback and attempt < max_retries:
current_model = fallback
continue
raise
Usage
# Automatically routed to Claude Opus (complex)
result = route_and_execute(
"Design a distributed caching system that handles partition tolerance "
"and maintains consistency across 3 regions. Consider the CAP trade-offs."
)
print(f"Tier: {result['tier']}, Model: {result['model_used']}")
# → Tier: complex, Model: claude-opus-4-7
# Automatically routed to GPT (structured output)
result = route_and_execute(
"Extract all API endpoints from this codebase and return as JSON with "
"method, path, description, and parameters for each.",
metadata={"json_output": True}
)
# → Tier: structured, Model: gpt-5.5
# Automatically routed to DeepSeek (bulk)
result = route_and_execute(
"Generate unit tests for all 15 functions in this utils module."
)
# → Tier: bulk, Model: deepseek-chat
Advanced: Quality Verification
For critical tasks, add a verification step — route to a cheap model first, then verify quality with a better one:
def verified_execution(prompt: str, quality_threshold: float = 0.8):
"""Execute with cheap model, verify with expensive model if needed."""
# First pass: cheap model
result = route_and_execute(prompt)
# If already using complex tier, no verification needed
if result["tier"] == "complex":
return result
# Quick quality check with a more capable model
verification = route_and_execute(
f"Rate the quality of this response on a scale of 0-1. "
f"Just return the number.\n\nOriginal prompt: {prompt}\n\n"
f"Response: {result['content']}",
metadata={"tier": "standard"}
)
try:
score = float(verification["content"].strip())
if score < quality_threshold:
# Re-execute with higher tier
return route_and_execute(prompt, metadata={"tier": "complex"})
except ValueError:
pass
return result
Advanced: Request Batching
When processing many similar items, batch them for the cheap model:
async def batch_route(items: list, prompt_template: str):
"""Process items in parallel using the cheapest suitable model."""
import asyncio
async def process_one(item):
prompt = prompt_template.format(item=item)
return route_and_execute(prompt)
tasks = [process_one(item) for item in items]
return await asyncio.gather(*tasks)
Monitoring Dashboard
Track routing decisions and costs:
class RoutingMetrics:
def __init__(self):
self.decisions = []
def record(self, result):
self.decisions.append({
"tier": result["tier"],
"model": result["model_used"],
"tokens": result["usage"],
"fallback": result["attempt"] > 1
})
def summary(self):
total = len(self.decisions)
by_tier = {}
for d in self.decisions:
tier = d["tier"]
by_tier.setdefault(tier, 0)
by_tier[tier] += 1
fallback_rate = sum(1 for d in self.decisions if d["fallback"]) / total
return {
"total_requests": total,
"tier_distribution": {k: v/total for k, v in by_tier.items()},
"fallback_rate": fallback_rate
}
Key Takeaways
- Classification doesn't need to be perfect — even a simple keyword-based classifier saves 50%+ over using one model
- Fallback chains are essential — providers have downtime, your pipeline shouldn't
- Monitor and tune — track which tier each request hits and adjust thresholds
- Use a gateway — one endpoint that supports all models makes routing trivial
Get Started
FuturMix provides all 22+ models through one OpenAI-compatible API at 10-30% off. Perfect for building routing pipelines.
client = OpenAI(
base_url="https://api.futurmix.ai/v1",
api_key="your-key"
)
Start with the simple keyword classifier, monitor for a week, then optimize.
How do you handle model routing in your AI pipelines? Share your approach in the comments.
Top comments (0)