Building an AI application with a single model is straightforward. Building one that uses the right model for each task — that's where the real engineering happens.
This tutorial walks through building a multi-model AI pipeline in Python that automatically routes requests to Claude, GPT, or DeepSeek based on task complexity, tracks costs, and handles failures gracefully.
Why Multi-Model?
Single-model architectures have a fundamental problem: you're either overpaying for simple tasks or underperforming on complex ones.
- Claude Opus 4.7 ($5/$25 per 1M tokens) — Best reasoning, but expensive for simple tasks
- Claude Sonnet 4.6 ($3/$15) — Great balance for most coding tasks
- GPT-5.5 ($3/$12) — Excellent structured output
- DeepSeek V3 ($0.27/$1.10) — 10x cheaper, good enough for bulk work
A smart pipeline uses all of them.
Architecture Overview
User Request
│
▼
┌─────────────┐
│ Task Router │ ← Classifies complexity
└─────────────┘
│
├── Complex → Claude Opus 4.7
├── Standard → Claude Sonnet 4.6
├── Structured → GPT-5.5
└── Bulk/Simple → DeepSeek V3
│
▼
┌─────────────┐
│ Fallback │ ← Auto-retry with backup model
└─────────────┘
│
▼
┌─────────────┐
│ Cost Tracker │ ← Log usage per model
└─────────────┘
The Code
Step 1: Unified Client Setup
Since all models are accessible through one OpenAI-compatible endpoint, the client setup is simple:
from openai import OpenAI
import time
import json
# One client for all models
client = OpenAI(
base_url="https://api.futurmix.ai/v1",
api_key="your-api-key"
)
# Model configs with pricing (per 1M tokens)
MODELS = {
"complex": {
"name": "claude-opus-4-7",
"input_cost": 4.50, # gateway price
"output_cost": 22.50,
"max_tokens": 4096
},
"standard": {
"name": "claude-sonnet-4-6",
"input_cost": 2.70,
"output_cost": 13.50,
"max_tokens": 4096
},
"structured": {
"name": "gpt-5.5",
"input_cost": 2.10,
"output_cost": 8.40,
"max_tokens": 4096
},
"bulk": {
"name": "deepseek-chat",
"input_cost": 0.19,
"output_cost": 0.77,
"max_tokens": 4096
}
}
# Fallback chain: if primary fails, try next
FALLBACK_CHAIN = {
"claude-opus-4-7": "claude-sonnet-4-6",
"claude-sonnet-4-6": "gpt-5.5",
"gpt-5.5": "claude-sonnet-4-6",
"deepseek-chat": "gpt-5.5"
}
Step 2: Task Router
The router classifies tasks and picks the right model:
def classify_task(prompt: str) -> str:
"""Classify task complexity based on prompt analysis."""
prompt_lower = prompt.lower()
word_count = len(prompt.split())
# Structured output indicators
structured_keywords = ["json", "extract", "parse", "schema", "csv",
"structured", "format as", "return as"]
if any(kw in prompt_lower for kw in structured_keywords):
return "structured"
# Complex task indicators
complex_keywords = ["refactor", "architect", "design", "debug",
"race condition", "optimize", "security audit",
"explain why", "trade-offs", "compare approaches"]
if any(kw in prompt_lower for kw in complex_keywords):
return "complex"
# Bulk/simple task indicators
bulk_keywords = ["generate tests", "add docstrings", "translate",
"boilerplate", "template", "lint", "format",
"add comments", "rename"]
if any(kw in prompt_lower for kw in bulk_keywords):
return "bulk"
# Default: standard
# Long prompts (likely complex context) → upgrade
if word_count > 500:
return "complex"
return "standard"
Step 3: Cost Tracker
class CostTracker:
def __init__(self):
self.usage_log = []
self.total_cost = 0.0
def log(self, model: str, input_tokens: int, output_tokens: int):
# Find model config
config = None
for tier in MODELS.values():
if tier["name"] == model:
config = tier
break
if not config:
return
cost = (input_tokens * config["input_cost"] / 1_000_000 +
output_tokens * config["output_cost"] / 1_000_000)
entry = {
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost": round(cost, 6),
"timestamp": time.time()
}
self.usage_log.append(entry)
self.total_cost += cost
def summary(self):
by_model = {}
for entry in self.usage_log:
model = entry["model"]
if model not in by_model:
by_model[model] = {"calls": 0, "cost": 0, "tokens": 0}
by_model[model]["calls"] += 1
by_model[model]["cost"] += entry["cost"]
by_model[model]["tokens"] += entry["input_tokens"] + entry["output_tokens"]
return {
"total_cost": round(self.total_cost, 4),
"total_calls": len(self.usage_log),
"by_model": by_model
}
tracker = CostTracker()
Step 4: The Pipeline
def call_model(prompt: str, model_name: str, system_prompt: str = None,
max_retries: int = 2) -> dict:
"""Call a model with automatic fallback."""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
current_model = model_name
for attempt in range(max_retries + 1):
try:
response = client.chat.completions.create(
model=current_model,
messages=messages,
max_tokens=4096,
temperature=0.7
)
# Track usage
usage = response.usage
tracker.log(current_model, usage.prompt_tokens,
usage.completion_tokens)
return {
"content": response.choices[0].message.content,
"model": current_model,
"tokens": {
"input": usage.prompt_tokens,
"output": usage.completion_tokens
}
}
except Exception as e:
print(f"[{current_model}] Error: {e}")
# Try fallback model
if current_model in FALLBACK_CHAIN:
current_model = FALLBACK_CHAIN[current_model]
print(f" → Falling back to {current_model}")
else:
raise
raise Exception(f"All models failed for this request")
def pipeline(prompt: str, system_prompt: str = None) -> dict:
"""Main pipeline: classify task → route to model → return result."""
task_type = classify_task(prompt)
model_config = MODELS[task_type]
print(f"Task classified as: {task_type} → {model_config['name']}")
result = call_model(prompt, model_config["name"], system_prompt)
result["task_type"] = task_type
return result
Step 5: Usage Examples
# Complex architecture task → Claude Opus
result = pipeline(
"Design a caching strategy for a multi-tenant SaaS application "
"that handles 10K requests/second. Consider trade-offs between "
"Redis cluster, local cache, and CDN caching."
)
print(f"Model used: {result['model']}")
# → claude-opus-4-7
# Structured extraction → GPT-5.5
result = pipeline(
"Extract all function names, parameters, and return types from "
"this Python file and return as JSON schema."
)
print(f"Model used: {result['model']}")
# → gpt-5.5
# Bulk generation → DeepSeek
result = pipeline(
"Generate unit tests for all public methods in this class. "
"Use pytest with parametrize for edge cases."
)
print(f"Model used: {result['model']}")
# → deepseek-chat
# Check costs
print(json.dumps(tracker.summary(), indent=2))
Advanced: Streaming Support
For interactive applications, add streaming:
def stream_pipeline(prompt: str, system_prompt: str = None):
"""Streaming version of the pipeline."""
task_type = classify_task(prompt)
model_config = MODELS[task_type]
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
stream = client.chat.completions.create(
model=model_config["name"],
messages=messages,
max_tokens=4096,
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
Advanced: Parallel Processing for Batch Tasks
When processing many items, run them in parallel with the cheap model:
import concurrent.futures
def batch_process(items: list, prompt_template: str,
max_workers: int = 5) -> list:
"""Process a batch of items with DeepSeek (cheapest model)."""
def process_one(item):
prompt = prompt_template.format(item=item)
return call_model(prompt, "deepseek-chat")
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_one, item): item
for item in items}
for future in concurrent.futures.as_completed(futures):
item = futures[future]
try:
result = future.result()
results.append({"item": item, "result": result})
except Exception as e:
results.append({"item": item, "error": str(e)})
return results
# Example: generate docstrings for 50 functions
functions = ["def calculate_tax(income, rate):", "def validate_email(email):", ...]
results = batch_process(
functions,
"Write a clear docstring for this Python function:\n{item}"
)
Cost Impact
Here's what this architecture saves on a typical day of development (8 hours, ~200 API calls):
| Approach | Monthly Cost |
|---|---|
| Claude Opus for everything | ~$450 |
| Claude Sonnet for everything | ~$270 |
| Smart routing (this pipeline) | ~$85 |
That's a 70% reduction from using Opus for everything, because most tasks don't need the most expensive model.
Production Considerations
- Add logging — Track which models handle which tasks so you can tune the classifier
-
Set timeouts — Some models are slower; add
timeoutparameter to client calls - Rate limiting — Implement token bucket per model to stay under API limits
- Caching — Cache responses for identical prompts (hash the prompt + model)
- Monitoring — Alert when fallback rate exceeds 5% (indicates provider issues)
Get Started
This pipeline works with any OpenAI-compatible API. FuturMix gives you one API key for 22+ models at 10-30% off — Claude, GPT, DeepSeek, Gemini, and more.
client = OpenAI(
base_url="https://api.futurmix.ai/v1",
api_key="your-key"
)
The full code from this tutorial is ready to copy-paste and customize for your use case.
What's your multi-model setup? Share your routing strategies in the comments.
Top comments (0)