A practical guide to reducing LLM inference costs by 40-60% without sacrificing quality — using semantic caching, request batching and intelligent model routing. Includes full Python implementations, architecture diagrams and real pricing breakdowns.
The moment an LLM-powered product gains traction, the invoices start arriving. A pipeline processing 500K requests per day at GPT-4o pricing can easily run $15,000-$25,000/month — and that number only climbs as usage grows. The reflex is to switch to a cheaper model, but that trades cost for quality in ways that surface as user complaints weeks later.
There's a better path. Three techniques — semantic caching, request batching and model routing — can cut inference costs by 40-60% while maintaining (and sometimes improving) output quality. These aren't theoretical ideas. They're production patterns used in high-volume LLM systems across industries.
This guide walks through each technique with full implementations, then shows how combining all three creates compounding savings.
The Cost Problem at Scale
Here's what typical LLM costs look like before any optimization:
┌─────────────────────────────────────────────────────────────────────┐
│ MONTHLY LLM COST AT DIFFERENT VOLUMES │
│ (GPT-4o pricing, avg 800 tokens/request) │
├─────────────────┬──────────────┬──────────────┬─────────────────────┤
│ Daily Requests │ Input Cost │ Output Cost │ Monthly Total │
├─────────────────┼──────────────┼──────────────┼─────────────────────┤
│ 10,000 │ $225 │ $450 │ $675 │
│ 50,000 │ $1,125 │ $2,250 │ $3,375 │
│ 200,000 │ $4,500 │ $9,000 │ $13,500 │
│ 500,000 │ $11,250 │ $22,500 │ $33,750 │
│ 1,000,000 │ $22,500 │ $45,000 │ $67,500 │
└─────────────────┴──────────────┴──────────────┴─────────────────────┘
GPT-4o: $2.50/1M input tokens, $10.00/1M output tokens (2025 pricing)
At 500K daily requests, that's $33,750/month — over $400K/year. Even modest percentage reductions translate into meaningful savings. The three techniques covered here attack different parts of this cost surface and they stack.
Section 1: Semantic Caching
The Core Idea
Most LLM applications have significant request overlap. Users ask similar (not identical) questions. Product descriptions get summarized repeatedly. Classification prompts recur with minor variations. Semantic caching exploits this by storing previous responses and returning them for semantically similar new requests — skipping the LLM call entirely.
Unlike exact-match caching, semantic caching uses embedding similarity to match requests that mean the same thing but are phrased differently. "What's the return policy?" and "How do I return an item?" should hit the same cache entry.
┌──────────────────────────────────────────────────────────────────────┐
│ SEMANTIC CACHING ARCHITECTURE │
│ │
│ Incoming Request │
│ │ │
│ ▼ │
│ ┌──────────┐ ┌──────────────┐ ┌─────────────────────┐ │
│ │ Embedding│────►│ Vector Search│────►│ Cache Hit? │ │
│ │ Model │ │ (Redis/FAISS)│ │ similarity > 0.95 │ │
│ └──────────┘ └──────────────┘ └──────┬──────────────┘ │
│ │ │
│ ┌───────────┴───────────┐ │
│ │ │ │
│ YES NO │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Return Cached│ │ Call LLM │ │
│ │ Response │ │ API │ │
│ └──────────────┘ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Store in │ │
│ │ Cache + TTL │ │
│ └──────────────┘ │
└──────────────────────────────────────────────────────────────────────┘
Full Implementation
import hashlib
import json
import time
import numpy as np
import redis
import openai
from typing import Optional
class SemanticCache:
"""
Semantic cache for LLM responses using Redis and embedding similarity.
Stores embeddings alongside cached responses. On each request,
embeds the query and checks for high-similarity matches before
calling the LLM.
"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
similarity_threshold: float = 0.95,
default_ttl: int = 3600,
embedding_model: str = "text-embedding-3-small",
namespace: str = "llm_cache"
):
self.redis_client = redis.from_url(redis_url)
self.similarity_threshold = similarity_threshold
self.default_ttl = default_ttl
self.embedding_model = embedding_model
self.namespace = namespace
self.oai_client = openai.OpenAI()
# Metrics
self._hits = 0
self._misses = 0
def _get_embedding(self, text: str) -> list[float]:
"""Generate embedding for a text string."""
response = self.oai_client.embeddings.create(
model=self.embedding_model,
input=text
)
return response.data[0].embedding
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
"""Compute cosine similarity between two vectors."""
a_arr, b_arr = np.array(a), np.array(b)
return float(np.dot(a_arr, b_arr) / (
np.linalg.norm(a_arr) * np.linalg.norm(b_arr)
))
def _cache_key(self, prompt_hash: str) -> str:
return f"{self.namespace}:{prompt_hash}"
def _get_all_cache_entries(self) -> list[dict]:
"""Retrieve all cached entries for similarity comparison."""
keys = self.redis_client.keys(f"{self.namespace}:*")
entries = []
for key in keys:
data = self.redis_client.get(key)
if data:
entries.append(json.loads(data))
return entries
def lookup(self, prompt: str, system_prompt: str = "") -> Optional[str]:
"""
Check cache for a semantically similar prompt.
Returns cached response or None.
"""
combined = f"{system_prompt}\n---\n{prompt}" if system_prompt else prompt
query_embedding = self._get_embedding(combined)
# Search existing cache entries for semantic match
best_match = None
best_similarity = 0.0
for entry in self._get_all_cache_entries():
similarity = self._cosine_similarity(
query_embedding, entry["embedding"]
)
if similarity > best_similarity:
best_similarity = similarity
best_match = entry
if best_match and best_similarity >= self.similarity_threshold:
self._hits += 1
return best_match["response"]
self._misses += 1
return None
def store(
self,
prompt: str,
response: str,
system_prompt: str = "",
ttl: Optional[int] = None
):
"""Store a prompt-response pair in the cache."""
combined = f"{system_prompt}\n---\n{prompt}" if system_prompt else prompt
embedding = self._get_embedding(combined)
# Hash the embedding for a stable key
prompt_hash = hashlib.sha256(combined.encode()).hexdigest()[:16]
entry = {
"prompt": prompt,
"system_prompt": system_prompt,
"response": response,
"embedding": embedding,
"created_at": time.time()
}
self.redis_client.setex(
self._cache_key(prompt_hash),
ttl or self.default_ttl,
json.dumps(entry)
)
def cached_completion(
self,
prompt: str,
system_prompt: str = "",
model: str = "gpt-4o",
ttl: Optional[int] = None,
**kwargs
) -> dict:
"""
Drop-in replacement for LLM completion that checks cache first.
Returns dict with response and metadata.
"""
# Step 1: Check cache
cached = self.lookup(prompt, system_prompt)
if cached is not None:
return {
"response": cached,
"source": "cache",
"cost": 0.0002 # Embedding cost only
}
# Step 2: Call LLM
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
completion = self.oai_client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
response_text = completion.choices[0].message.content
# Step 3: Store in cache
self.store(prompt, response_text, system_prompt, ttl)
return {
"response": response_text,
"source": "llm",
"cost": self._estimate_cost(completion, model)
}
def _estimate_cost(self, completion, model: str) -> float:
"""Estimate cost based on token usage."""
pricing = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
}
rates = pricing.get(model, pricing["gpt-4o"])
usage = completion.usage
return (
(usage.prompt_tokens / 1_000_000) * rates["input"] +
(usage.completion_tokens / 1_000_000) * rates["output"]
)
@property
def hit_rate(self) -> float:
total = self._hits + self._misses
return self._hits / total if total > 0 else 0.0
@property
def stats(self) -> dict:
return {
"hits": self._hits,
"misses": self._misses,
"hit_rate": f"{self.hit_rate:.1%}",
"cached_entries": len(self._get_all_cache_entries())
}
TTL Strategies
Not all cache entries should live equally long. A smart TTL strategy accounts for how quickly the underlying data changes:
| Content Type | Recommended TTL | Reasoning |
|---|---|---|
| Static knowledge (FAQ, docs) | 24-72 hours | Rarely changes |
| Summarization tasks | 6-12 hours | Same input = same summary |
| Classification/labeling | 12-24 hours | Deterministic task |
| Real-time data (prices, news) | 5-15 minutes | Stale quickly |
| Creative generation | No caching | Users expect variety |
When Caching Helps vs. Doesn't
Semantic caching delivers the best ROI when request diversity is low to moderate. The key metric is cache hit rate, which depends on the distribution of incoming queries.
┌─────────────────────────────────────────────────────────────────────┐
│ CACHE HIT RATE vs. REQUEST DIVERSITY │
│ │
│ Hit Rate │
│ 80% ┤ ████ │
│ 70% ┤ ████ ████ │
│ 60% ┤ ████ ████ │
│ 50% ┤ ████ ████ ████ │
│ 40% ┤ ████ ████ ████ │
│ 30% ┤ ████ ████ ████ ████ │
│ 20% ┤ ████ ████ ████ ████ ████ │
│ 10% ┤ ████ ████ ████ ████ ████ ████ │
│ 0% ┤ ████ ████ ████ ████ ████ ████ ████ │
│ └─────────────────────────────────── │
│ FAQ Support Search Coding Creative Open-ended │
│ Bot Chat Assist Assist Gen Research │
│ │
│ Rule of thumb: if hit rate < 15%, caching adds latency │
│ without meaningful savings. Disable it for that endpoint. │
└─────────────────────────────────────────────────────────────────────┘
Customer support bots and FAQ systems routinely see 50-75% cache hit rates. Open-ended research or creative tasks sit below 10%. Measure first, then decide.
Production note: For large-scale deployments, replace the brute-force similarity scan with a proper vector index — Redis Stack's
FT.SEARCHwith HNSW indexing, or a dedicated vector store like Qdrant. The brute-force approach works fine up to ~50K cached entries; beyond that, search latency dominates.
Section 2: Request Batching
The Core Idea
Most LLM providers offer batch APIs at a significant discount — OpenAI's Batch API costs 50% less than the real-time API. Even without a batch discount, batching requests reduces overhead from connection setup, rate limiting and retry logic.
The trick is building a batching layer that's transparent to the application. Requests accumulate in a queue, get flushed as a batch when a size threshold or time interval is reached and individual callers get their results asynchronously.
┌─────────────────────────────────────────────────────────────────────┐
│ REQUEST BATCHING FLOW │
│ │
│ Request A ──┐ │
│ Request B ──┤ ┌──────────────┐ │
│ Request C ──┼────►│ Batch Queue │ │
│ Request D ──┤ │ │ │
│ Request E ──┘ │ Flush when: │ │
│ │ • size >= 20 │ ┌─────────────────────┐ │
│ │ • age > 500ms│────►│ Batch API Call │ │
│ └──────────────┘ │ (50% cheaper) │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌──────────┴──────────┐ │
│ ▼ ▼ ▼ ▼ ▼ │
│ Res Res Res Res Res │
│ A B C D E │
│ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ │
│ Futures resolved, callers unblock │
└─────────────────────────────────────────────────────────────────────┘
Batch API vs. Real-Time Pricing
┌─────────────────────────────────────────────────────────────────────┐
│ BATCH vs REAL-TIME PRICING (per 1M tokens) │
├──────────────────┬────────────────────┬─────────────────────────────┤
│ Model │ Real-Time │ Batch API Savings │
├──────────────────┼────────────────────┼─────────────────────────────┤
│ GPT-4o Input │ $2.50 │ $1.25 50% │
│ GPT-4o Output │ $10.00 │ $5.00 50% │
│ GPT-4o-mini In │ $0.15 │ $0.075 50% │
│ GPT-4o-mini Out │ $0.60 │ $0.30 50% │
│ Claude Sonnet In│ $3.00 │ $1.50* 50% │
│ Claude Sonnet O │ $15.00 │ $7.50* 50% │
└──────────────────┴────────────────────┴─────────────────────────────┘
* Anthropic batch pricing via Message Batches API
The caveat: batch APIs are asynchronous. Results arrive in minutes to hours, not milliseconds. This pattern works for offline processing, background enrichment and any workload that doesn't need real-time responses.
For latency-sensitive workloads, client-side micro-batching provides a middle ground — accumulate requests over a short window (200-500ms) and send them as concurrent real-time calls, reducing per-request overhead.
Full Implementation
import asyncio
import uuid
import time
from dataclasses import dataclass, field
from typing import Any, Optional
import openai
@dataclass
class BatchRequest:
"""A single request waiting in the batch queue."""
request_id: str
messages: list[dict]
model: str
future: asyncio.Future
created_at: float = field(default_factory=time.time)
kwargs: dict = field(default_factory=dict)
class AsyncBatchQueue:
"""
Accumulates LLM requests and flushes them in batches.
Supports two modes:
- micro_batch: Groups requests into concurrent real-time calls
(reduces overhead, keeps low latency)
- batch_api: Submits to OpenAI's Batch API for 50% discount
(higher latency, significant savings)
"""
def __init__(
self,
max_batch_size: int = 20,
flush_interval_ms: int = 500,
mode: str = "micro_batch", # "micro_batch" or "batch_api"
max_concurrent: int = 10
):
self.max_batch_size = max_batch_size
self.flush_interval = flush_interval_ms / 1000.0
self.mode = mode
self.max_concurrent = max_concurrent
self.queue: list[BatchRequest] = []
self.oai_client = openai.AsyncOpenAI()
self._lock = asyncio.Lock()
self._flush_task: Optional[asyncio.Task] = None
self._running = False
# Metrics
self.total_requests = 0
self.total_batches = 0
self.total_cost = 0.0
async def start(self):
"""Start the background flush loop."""
self._running = True
self._flush_task = asyncio.create_task(self._flush_loop())
async def stop(self):
"""Flush remaining requests and stop."""
self._running = False
if self.queue:
await self._flush_batch()
if self._flush_task:
self._flush_task.cancel()
async def submit(
self,
messages: list[dict],
model: str = "gpt-4o",
**kwargs
) -> str:
"""
Submit a request to the batch queue.
Returns the LLM response text when the batch completes.
"""
loop = asyncio.get_event_loop()
future = loop.create_future()
request = BatchRequest(
request_id=str(uuid.uuid4()),
messages=messages,
model=model,
future=future,
kwargs=kwargs
)
async with self._lock:
self.queue.append(request)
self.total_requests += 1
# Flush immediately if batch is full
if len(self.queue) >= self.max_batch_size:
await self._flush_batch()
return await future
async def _flush_loop(self):
"""Periodically flush the queue based on time interval."""
while self._running:
await asyncio.sleep(self.flush_interval)
async with self._lock:
if self.queue:
await self._flush_batch()
async def _flush_batch(self):
"""Process all queued requests as a batch."""
if not self.queue:
return
batch = self.queue[:self.max_batch_size]
self.queue = self.queue[self.max_batch_size:]
self.total_batches += 1
if self.mode == "micro_batch":
await self._process_micro_batch(batch)
else:
await self._process_batch_api(batch)
async def _process_micro_batch(self, batch: list[BatchRequest]):
"""Send requests concurrently with controlled parallelism."""
semaphore = asyncio.Semaphore(self.max_concurrent)
async def process_single(req: BatchRequest):
async with semaphore:
try:
completion = await self.oai_client.chat.completions.create(
model=req.model,
messages=req.messages,
**req.kwargs
)
response = completion.choices[0].message.content
self.total_cost += self._estimate_cost(
completion.usage, req.model
)
req.future.set_result(response)
except Exception as e:
req.future.set_exception(e)
await asyncio.gather(*[process_single(r) for r in batch])
async def _process_batch_api(self, batch: list[BatchRequest]):
"""
Submit to OpenAI's Batch API for 50% cost reduction.
Polls for completion and resolves futures when done.
"""
import json
import tempfile
# Build JSONL for batch submission
lines = []
for req in batch:
lines.append(json.dumps({
"custom_id": req.request_id,
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": req.model,
"messages": req.messages,
**req.kwargs
}
}))
# Upload file
jsonl_content = "\n".join(lines)
with tempfile.NamedTemporaryFile(
mode='w', suffix='.jsonl', delete=False
) as f:
f.write(jsonl_content)
f.flush()
upload = await self.oai_client.files.create(
file=open(f.name, 'rb'),
purpose="batch"
)
# Create batch
batch_obj = await self.oai_client.batches.create(
input_file_id=upload.id,
endpoint="/v1/chat/completions",
completion_window="24h"
)
# Poll for completion
request_map = {r.request_id: r for r in batch}
while True:
status = await self.oai_client.batches.retrieve(batch_obj.id)
if status.status == "completed":
# Retrieve results
content = await self.oai_client.files.content(
status.output_file_id
)
for line in content.text.strip().split("\n"):
result = json.loads(line)
req = request_map[result["custom_id"]]
response = result["response"]["body"]["choices"][0][
"message"
]["content"]
req.future.set_result(response)
break
elif status.status == "failed":
for req in batch:
req.future.set_exception(
RuntimeError(f"Batch failed: {status.errors}")
)
break
await asyncio.sleep(10)
def _estimate_cost(self, usage, model: str) -> float:
pricing = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
}
rates = pricing.get(model, pricing["gpt-4o"])
return (
(usage.prompt_tokens / 1_000_000) * rates["input"] +
(usage.completion_tokens / 1_000_000) * rates["output"]
)
@property
def stats(self) -> dict:
avg_batch = (
self.total_requests / self.total_batches
if self.total_batches > 0 else 0
)
return {
"total_requests": self.total_requests,
"total_batches": self.total_batches,
"avg_batch_size": f"{avg_batch:.1f}",
"total_cost": f"${self.total_cost:.4f}"
}
# Usage example
async def main():
queue = AsyncBatchQueue(
max_batch_size=15,
flush_interval_ms=300,
mode="micro_batch"
)
await queue.start()
# Submit many requests — they'll be batched automatically
tasks = []
prompts = [
"Summarize the key points of GDPR compliance.",
"What are the main risks of cloud migration?",
"Explain SOC 2 Type II certification requirements.",
# ... hundreds more
]
for prompt in prompts:
task = queue.submit(
messages=[{"role": "user", "content": prompt}],
model="gpt-4o-mini"
)
tasks.append(task)
responses = await asyncio.gather(*tasks)
await queue.stop()
print(queue.stats)
Section 3: Model Routing
The Core Idea
Not every request needs the most expensive model. A simple classification task doesn't require GPT-4o. A complex multi-step reasoning problem shouldn't be sent to GPT-4o-mini. Model routing dynamically selects the cheapest model capable of handling each request at the required quality level.
The savings are dramatic because pricing differences between model tiers are 10-30x:
┌─────────────────────────────────────────────────────────────────────┐
│ MODEL PRICING COMPARISON (per 1M tokens) │
├──────────────────┬──────────┬───────────┬───────────────────────────┤
│ Model │ Input │ Output │ Relative Cost (vs 4o) │
├──────────────────┼──────────┼───────────┼───────────────────────────┤
│ GPT-4o │ $2.50 │ $10.00 │ 1.00x (baseline) │
│ GPT-4o-mini │ $0.15 │ $0.60 │ 0.06x (16x cheaper) │
│ Claude Sonnet │ $3.00 │ $15.00 │ 1.38x │
│ Claude Haiku │ $0.25 │ $1.25 │ 0.11x (9x cheaper) │
│ Gemini Flash │ $0.075 │ $0.30 │ 0.03x (33x cheaper) │
│ Gemini Pro │ $1.25 │ $5.00 │ 0.48x │
└──────────────────┴──────────┴───────────┴───────────────────────────┘
If 60% of requests can be handled by a mini/flash-tier model, routing alone cuts the bill by 50%+.
Routing Decision Tree
┌─────────────────────────────────────────────────────────────────────┐
│ MODEL ROUTING DECISION TREE │
│ │
│ Incoming Request │
│ │ │
│ ▼ │
│ ┌───────────────────┐ │
│ │ Complexity Router │ │
│ │ (classify prompt) │ │
│ └────────┬──────────┘ │
│ │ │
│ ┌───────────┼───────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ SIMPLE │ │ MEDIUM │ │ COMPLEX│ │
│ └───┬────┘ └───┬────┘ └───┬────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ GPT-4o │ │ GPT-4o │ │ GPT-4o │ │
│ │ -mini │ │ -mini │ │ (full) │ │
│ │ $0.15/M │ │ $0.15/M │ │ $2.50/M │ │
│ └────┬─────┘ └────┬─────┘ └──────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Return │ │Confidence│ │
│ │ directly │ │ check │ │
│ └──────────┘ └────┬─────┘ │
│ │ │
│ ┌────────┴────────┐ │
│ ▼ ▼ │
│ HIGH conf. LOW conf. │
│ ┌────────┐ ┌──────────┐ │
│ │ Return │ │ Escalate │ │
│ │ result │ │ to GPT-4o│ │
│ └────────┘ └──────────┘ │
│ │
│ "Cascading" pattern: try cheap first, escalate only when needed │
└─────────────────────────────────────────────────────────────────────┘
Full Implementation
import re
import json
import openai
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class Complexity(Enum):
SIMPLE = "simple"
MEDIUM = "medium"
COMPLEX = "complex"
@dataclass
class RoutingDecision:
model: str
complexity: Complexity
confidence: float
response: str
escalated: bool
cost: float
class ModelRouter:
"""
Routes LLM requests to the cheapest capable model based on
request complexity. Supports cascading: tries the cheap model
first and escalates to the expensive model if confidence is low.
"""
# Model configs: (model_name, input_price_per_M, output_price_per_M)
MODELS = {
"cheap": ("gpt-4o-mini", 0.15, 0.60),
"expensive": ("gpt-4o", 2.50, 10.00),
}
# Complexity classification signals
SIMPLE_PATTERNS = [
r"\b(classify|categorize|label|tag)\b",
r"\b(yes or no|true or false)\b",
r"\b(extract|pull out|find the)\b",
r"\b(translate|convert)\b",
r"\b(summarize in one sentence)\b",
]
COMPLEX_PATTERNS = [
r"\b(analyze|compare and contrast|evaluate)\b",
r"\b(step.by.step|reasoning|chain of thought)\b",
r"\b(write a detailed|comprehensive|in-depth)\b",
r"\b(code review|debug|architect)\b",
r"\b(multi.step|considering all|trade.?offs)\b",
]
def __init__(
self,
cascade_enabled: bool = True,
confidence_threshold: float = 0.7,
max_token_complexity: int = 200
):
self.cascade_enabled = cascade_enabled
self.confidence_threshold = confidence_threshold
self.max_token_complexity = max_token_complexity
self.oai_client = openai.OpenAI()
# Metrics
self.routing_log: list[dict] = []
self.total_cost = 0.0
self._model_counts = {"cheap": 0, "expensive": 0, "escalated": 0}
def classify_complexity(self, prompt: str) -> Complexity:
"""
Rule-based complexity classifier.
Fast and free — no LLM call needed for routing itself.
"""
prompt_lower = prompt.lower()
simple_score = sum(
1 for p in self.SIMPLE_PATTERNS
if re.search(p, prompt_lower)
)
complex_score = sum(
1 for p in self.COMPLEX_PATTERNS
if re.search(p, prompt_lower)
)
# Token count heuristic
token_estimate = len(prompt.split())
if token_estimate > self.max_token_complexity:
complex_score += 2
# Check for structured output requirements
if any(kw in prompt_lower for kw in ["json", "schema", "format:"]):
complex_score += 1
if complex_score >= 2:
return Complexity.COMPLEX
elif simple_score >= 1 and complex_score == 0:
return Complexity.SIMPLE
else:
return Complexity.MEDIUM
def _call_model(
self, tier: str, messages: list[dict], **kwargs
) -> tuple:
"""Call a model and return (response_text, usage, model_name)."""
model_name, _, _ = self.MODELS[tier]
completion = self.oai_client.chat.completions.create(
model=model_name,
messages=messages,
**kwargs
)
return (
completion.choices[0].message.content,
completion.usage,
model_name
)
def _estimate_confidence(self, response: str) -> float:
"""
Heuristic confidence scoring for cascading decisions.
Checks for hedging language that indicates uncertainty.
"""
hedging_phrases = [
"i'm not sure", "it's unclear", "i don't know",
"it depends", "hard to say", "uncertain",
"might be", "possibly", "i think", "not confident",
"this is a guess", "approximately"
]
response_lower = response.lower()
hedge_count = sum(
1 for phrase in hedging_phrases
if phrase in response_lower
)
# High hedge count = low confidence
if hedge_count >= 3:
return 0.3
elif hedge_count >= 1:
return 0.6
# Very short responses for complex prompts may indicate confusion
if len(response.split()) < 10:
return 0.5
return 0.9
def _compute_cost(self, usage, tier: str) -> float:
_, input_price, output_price = self.MODELS[tier]
return (
(usage.prompt_tokens / 1_000_000) * input_price +
(usage.completion_tokens / 1_000_000) * output_price
)
def route(
self,
prompt: str,
system_prompt: str = "",
**kwargs
) -> RoutingDecision:
"""
Route a request to the appropriate model.
Uses cascading for MEDIUM complexity when enabled.
"""
complexity = self.classify_complexity(prompt)
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
# SIMPLE: always use cheap model
if complexity == Complexity.SIMPLE:
response, usage, model = self._call_model(
"cheap", messages, **kwargs
)
cost = self._compute_cost(usage, "cheap")
self._model_counts["cheap"] += 1
self.total_cost += cost
return RoutingDecision(
model=model, complexity=complexity,
confidence=0.95, response=response,
escalated=False, cost=cost
)
# COMPLEX: always use expensive model
if complexity == Complexity.COMPLEX:
response, usage, model = self._call_model(
"expensive", messages, **kwargs
)
cost = self._compute_cost(usage, "expensive")
self._model_counts["expensive"] += 1
self.total_cost += cost
return RoutingDecision(
model=model, complexity=complexity,
confidence=0.95, response=response,
escalated=False, cost=cost
)
# MEDIUM: cascade — try cheap first, escalate if needed
response, usage, model = self._call_model(
"cheap", messages, **kwargs
)
cost = self._compute_cost(usage, "cheap")
if not self.cascade_enabled:
self._model_counts["cheap"] += 1
self.total_cost += cost
return RoutingDecision(
model=model, complexity=complexity,
confidence=0.8, response=response,
escalated=False, cost=cost
)
confidence = self._estimate_confidence(response)
if confidence >= self.confidence_threshold:
self._model_counts["cheap"] += 1
self.total_cost += cost
return RoutingDecision(
model=model, complexity=complexity,
confidence=confidence, response=response,
escalated=False, cost=cost
)
# Escalate to expensive model
response_2, usage_2, model_2 = self._call_model(
"expensive", messages, **kwargs
)
escalation_cost = self._compute_cost(usage_2, "expensive")
total_cost = cost + escalation_cost # Paid for both calls
self._model_counts["escalated"] += 1
self.total_cost += total_cost
return RoutingDecision(
model=model_2, complexity=complexity,
confidence=0.95, response=response_2,
escalated=True, cost=total_cost
)
@property
def stats(self) -> dict:
total = sum(self._model_counts.values())
return {
"total_requests": total,
"cheap_model_pct": (
f"{(self._model_counts['cheap'] / total * 100):.1f}%"
if total > 0 else "0%"
),
"expensive_model_pct": (
f"{(self._model_counts['expensive'] / total * 100):.1f}%"
if total > 0 else "0%"
),
"escalation_rate": (
f"{(self._model_counts['escalated'] / total * 100):.1f}%"
if total > 0 else "0%"
),
"total_cost": f"${self.total_cost:.4f}"
}
Routing Distribution in Practice
Across typical production workloads, a well-tuned router produces a distribution like this:
┌─────────────────────────────────────────────────────────────────────┐
│ TYPICAL ROUTING DISTRIBUTION BY WORKLOAD │
│ │
│ Workload Type Cheap Escalated Expensive Cost Savings │
│ ────────────── ───── ───────── ───────── ──────────── │
│ Customer Support 75% 10% 15% ~65% │
│ Content Moderation 85% 5% 10% ~75% │
│ Data Extraction 70% 12% 18% ~55% │
│ Code Generation 30% 20% 50% ~25% │
│ Research/Analysis 20% 15% 65% ~15% │
│ │
│ Blended average across workloads: ~45% │
└─────────────────────────────────────────────────────────────────────┘
Classification-heavy workloads see the biggest wins. Complex generation tasks benefit less — but even routing 20-30% of requests to a cheaper model adds up at scale.
Combining All Three: Compounding Savings
Each technique attacks a different part of the cost surface. When combined, the savings compound:
┌──────────────────────────────────────────────────────────────────────┐
│ COMBINED OPTIMIZATION PIPELINE │
│ │
│ Request ──► [Semantic Cache] ──hit──► Return (cost: ~$0) │
│ │ │
│ miss │
│ │ │
│ ▼ │
│ [Model Router] ──► Select cheapest capable model │
│ │ │
│ ▼ │
│ [Batch Queue] ──► Accumulate, flush as batch │
│ │ │
│ ▼ │
│ [LLM API Call] ──► Store result in cache │
└──────────────────────────────────────────────────────────────────────┘
Realistic Monthly Savings at Scale
The following table models a pipeline handling 200K requests/day with a moderate workload mix (40% simple, 35% medium, 25% complex):
┌─────────────────────────────────────────────────────────────────────┐
│ MONTHLY COST COMPARISON: BEFORE vs AFTER OPTIMIZATION │
│ (200K requests/day, avg 800 tokens/req) │
├──────────────────────────┬──────────────┬───────────────────────────┤
│ Configuration │ Monthly Cost │ Savings vs Baseline │
├──────────────────────────┼──────────────┼───────────────────────────┤
│ Baseline (all GPT-4o) │ $13,500 │ — │
│ │ │ │
│ + Semantic Caching │ $9,450 │ $4,050 (30%) │
│ (30% hit rate) │ │ │
│ │ │ │
│ + Model Routing │ $5,670 │ $7,830 (58%) │
│ (60% routed cheap) │ │ │
│ │ │ │
│ + Batch API │ $3,969 │ $9,531 (71%) │
│ (50% batch discount │ │ │
│ on 70% of traffic) │ │ │
│ │ │ │
│ ALL THREE COMBINED │ $3,969/mo │ $9,531/mo saved │
│ │ │ $114,372/year saved │
├──────────────────────────┴──────────────┴───────────────────────────┤
│ │
│ BEFORE ████████████████████████████████████████████ $13,500/mo │
│ AFTER ████████████████ $3,969/mo │
│ ▲ │
│ └── 71% reduction │
│ │
└─────────────────────────────────────────────────────────────────────┘
The percentages don't multiply as naively as (1 - 0.3) * (1 - 0.58) * (1 - 0.5) because the techniques interact — cached requests don't need routing and routing decisions affect which requests enter the batch queue. The table above models these interactions realistically.
Production Monitoring
Optimization without observability is guesswork. Every production deployment should track these metrics:
import time
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class CostMonitor:
"""
Tracks per-request costs, cache performance and routing decisions.
Export to Prometheus, Datadog, or any metrics backend.
"""
request_log: list[dict] = field(default_factory=list)
_cost_by_model: dict = field(
default_factory=lambda: defaultdict(float)
)
_cache_hits: int = 0
_cache_misses: int = 0
_routing_decisions: dict = field(
default_factory=lambda: defaultdict(int)
)
def record_request(
self,
model: str,
cost: float,
cache_hit: bool,
complexity: str,
escalated: bool,
latency_ms: float
):
entry = {
"timestamp": time.time(),
"model": model,
"cost": cost,
"cache_hit": cache_hit,
"complexity": complexity,
"escalated": escalated,
"latency_ms": latency_ms
}
self.request_log.append(entry)
self._cost_by_model[model] += cost
if cache_hit:
self._cache_hits += 1
else:
self._cache_misses += 1
self._routing_decisions[complexity] += 1
def summary(self, last_n_hours: int = 24) -> dict:
cutoff = time.time() - (last_n_hours * 3600)
recent = [r for r in self.request_log if r["timestamp"] > cutoff]
if not recent:
return {"message": "No requests in time window"}
total_cost = sum(r["cost"] for r in recent)
cache_hits = sum(1 for r in recent if r["cache_hit"])
escalations = sum(1 for r in recent if r["escalated"])
avg_latency = sum(r["latency_ms"] for r in recent) / len(recent)
return {
"period_hours": last_n_hours,
"total_requests": len(recent),
"total_cost": f"${total_cost:.2f}",
"avg_cost_per_request": f"${total_cost / len(recent):.6f}",
"cache_hit_rate": f"{cache_hits / len(recent):.1%}",
"escalation_rate": f"{escalations / len(recent):.1%}",
"avg_latency_ms": f"{avg_latency:.0f}",
"cost_by_model": {
k: f"${v:.2f}" for k, v in self._cost_by_model.items()
}
}
Key alerts to configure:
- Cache hit rate drops below 20% — indicates a shift in request distribution; review cache TTLs and similarity threshold
- Escalation rate exceeds 30% — the complexity classifier needs retuning; too many medium requests are failing at the cheap tier
- Cost per request spikes above 2x rolling average — something in the routing logic may be broken, sending everything to the expensive model
- Batch queue depth exceeds 5 minutes — flush interval or batch size needs adjustment; requests are waiting too long
Implementation Checklist
Rolling this out incrementally avoids the risk of a big-bang deployment:
Week 1: Semantic Caching
- Deploy the cache layer with a conservative similarity threshold (0.97)
- Monitor hit rates for one week before lowering the threshold
- Set TTLs per endpoint based on content freshness requirements
Week 2: Model Routing
- Start with a simple two-tier router (cheap vs. expensive)
- Route only SIMPLE classified requests to the cheap model
- Enable cascading for MEDIUM requests once baseline metrics are stable
Week 3: Batching
- Enable micro-batching for latency-tolerant endpoints
- Migrate offline/batch workloads to the Batch API
- Monitor queue depth and flush intervals
Week 4: Monitoring and Tuning
- Deploy the cost monitor with dashboards
- Set up alerts for the four key metrics above
- A/B test quality on routed vs. non-routed traffic to validate no degradation
Key Takeaways
The economics of LLM-powered applications favor the teams that optimize inference costs early. The three techniques here — semantic caching, request batching and model routing — are not exotic. They're standard patterns in mature ML infrastructure, adapted for the specific cost structure of LLM APIs.
The compounding effect is what makes the approach powerful. Any single technique might save 20-30%. Combined, they routinely achieve 50-70% reductions, turning a $150K/year inference bill into a $50K/year one — with no degradation in output quality for the vast majority of requests.
The code in this guide is production-ready scaffolding. Adapt the complexity classifier to the specific request patterns in the target workload. Tune the cache similarity threshold based on measured false-positive rates. And always measure: the monitoring layer is not optional. Without it, there's no way to know if routing decisions are correct or if the cache is actually helping.
Cost efficiency isn't a one-time optimization. It's an ongoing practice — and these three techniques form the foundation.
If this was useful, follow for more deep dives on production ML infrastructure, LLM systems design and applied AI engineering.
Top comments (0)