The Problem Nobody Talks About
Last month I was building a content generation pipeline that needed to produce product descriptions for about 2,000 SKUs. Straightforward task — feed the product attributes into GPT-5.5, get back a polished description. I expected the bill to land around $15-20 based on token estimates.
The actual bill: $47.
After digging through the logs, I found the culprit. My retry logic was re-calling the API every time a downstream service timed out (which happened a lot during peak hours). Some prompts were hitting the API 4-5 times before the pipeline completed. Add in the development iterations — where I was tweaking the same prompt slightly and re-running it — and I was burning tokens on near-identical requests constantly.
That's when I decided to build a proper caching layer. Not a naive "cache everything" approach, but a smart system that understands when LLM responses can be safely reused and when they genuinely need a fresh call.
What You'll Build
By the end of this tutorial, you'll have a caching middleware that:
- Caches exact-match prompts with configurable TTL
- Detects semantically similar prompts using embedding distance
- Supports cache invalidation by model, temperature threshold, and time
- Tracks cache hit rates and estimated savings
- Works with any OpenAI-compatible API endpoint
Here's the architecture:
[Client Request] → [Cache Layer] → {cache hit?} → Return cached response
→ {cache miss?} → [LLM API] → Store + Return
Part 1: Exact-Match Caching with Content Hashing
The simplest approach that already saves a surprising amount of money: hash the request parameters and cache the response.
import hashlib
import json
import time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class CacheEntry:
response: dict
created_at: float
model: str
temperature: float
prompt_hash: str
hit_count: int = 0
class LLMCache:
def __init__(self, ttl_seconds: int = 3600, max_temperature: float = 0.3):
self.ttl_seconds = ttl_seconds
self.max_temperature = max_temperature
self._cache: dict[str, CacheEntry] = {}
self.stats = {"hits": 0, "misses": 0, "saved_tokens": 0}
def _hash_request(self, model: str, messages: list, temperature: float,
**kwargs) -> str:
"""Create a deterministic hash from request parameters."""
# Only cache deterministic-ish requests
key_data = {
"model": model,
"messages": messages,
"temperature": round(temperature, 2),
# Include response format if specified
"response_format": kwargs.get("response_format"),
}
raw = json.dumps(key_data, sort_keys=True)
return hashlib.sha256(raw.encode()).hexdigest()[:16]
def _is_cacheable(self, temperature: float) -> bool:
"""High-temperature requests are too random to cache."""
return temperature <= self.max_temperature
def get(self, model: str, messages: list, temperature: float,
**kwargs) -> Optional[dict]:
if not self._is_cacheable(temperature):
self.stats["misses"] += 1
return None
key = self._hash_request(model, messages, temperature, **kwargs)
entry = self._cache.get(key)
if entry is None:
self.stats["misses"] += 1
return None
# Check TTL
if time.time() - entry.created_at > self.ttl_seconds:
del self._cache[key]
self.stats["misses"] += 1
return None
entry.hit_count += 1
self.stats["hits"] += 1
usage = entry.response.get("usage", {})
self.stats["saved_tokens"] += usage.get("total_tokens", 0)
return entry.response
def set(self, model: str, messages: list, temperature: float,
response: dict, **kwargs):
if not self._is_cacheable(temperature):
return
key = self._hash_request(model, messages, temperature, **kwargs)
self._cache[key] = CacheEntry(
response=response,
created_at=time.time(),
model=model,
temperature=temperature,
prompt_hash=key,
)
def invalidate_model(self, model: str):
"""Remove all cached entries for a specific model."""
to_delete = [k for k, v in self._cache.items() if v.model == model]
for k in to_delete:
del self._cache[k]
This alone saved me about 35% on that content pipeline, because the retry logic was hitting cached responses instead of making fresh API calls.
But there's a problem: this only works for exact prompt matches. In practice, I found that many of my "duplicate" prompts were almost identical but not quite.
Part 2: Semantic Similarity Caching
This is where it gets interesting. Consider these two prompts:
"Write a product description for a wireless bluetooth headphone with 40hr battery life, noise cancellation, priced at $79"
"Generate a product description: wireless bluetooth headphones, 40 hour battery, ANC, $79"
These will produce nearly identical outputs, but exact-match caching won't catch the second one. For that, we need embedding-based similarity.
import numpy as np
from openai import OpenAI
class SemanticCache(LLMCache):
def __init__(self, similarity_threshold: float = 0.92, **kwargs):
super().__init__(**kwargs)
self.similarity_threshold = similarity_threshold
self._embeddings: dict[str, np.ndarray] = {}
# Use a cheaper embedding model
self._embed_client = OpenAI(
base_url="https://api.xidaoapi.com/v1",
api_key="your-api-key"
)
def _get_embedding(self, text: str) -> np.ndarray:
"""Get embedding for cache key comparison."""
resp = self._embed_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return np.array(resp.data[0].embedding)
def _extract_text(self, messages: list) -> str:
"""Extract the core prompt text for embedding."""
# Concatenate all user messages
parts = []
for msg in messages:
if msg.get("role") == "user":
parts.append(msg.get("content", ""))
return " ".join(parts)
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
def get(self, model: str, messages: list, temperature: float,
**kwargs) -> Optional[dict]:
# Try exact match first (fast path)
exact = super().get(model, messages, temperature, **kwargs)
if exact is not None:
return exact
if not self._is_cacheable(temperature):
return None
# Try semantic match (slow path)
query_text = self._extract_text(messages)
query_embedding = self._get_embedding(query_text)
best_similarity = 0.0
best_entry = None
for key, entry in self._cache.items():
if entry.model != model:
continue
if time.time() - entry.created_at > self.ttl_seconds:
continue
cached_embedding = self._embeddings.get(key)
if cached_embedding is None:
continue
sim = self._cosine_similarity(query_embedding, cached_embedding)
if sim > best_similarity:
best_similarity = sim
best_entry = entry
if best_entry and best_similarity >= self.similarity_threshold:
best_entry.hit_count += 1
self.stats["hits"] += 1
usage = best_entry.response.get("usage", {})
self.stats["saved_tokens"] += usage.get("total_tokens", 0)
return best_entry.response
self.stats["misses"] += 1
return None
def set(self, model: str, messages: list, temperature: float,
response: dict, **kwargs):
super().set(model, messages, temperature, response, **kwargs)
# Store the embedding for semantic matching
key = self._hash_request(model, messages, temperature, **kwargs)
query_text = self._extract_text(messages)
self._embeddings[key] = self._get_embedding(query_text)
Important caveat: the embedding API call adds latency and cost. In my benchmarks, the embedding call costs about $0.00002 per query (using text-embedding-3-small). If your average LLM call costs $0.005, the embedding overhead is negligible. But if you're making tons of very cheap calls, the math might not work out.
Here's what I found in production:
| Scenario | Exact Cache Hit Rate | Semantic Cache Hit Rate | Net Savings |
|---|---|---|---|
| Content generation (2K SKUs) | 23% | 41% | ~38% |
| Customer support bot | 12% | 31% | ~26% |
| Code review automation | 8% | 19% | ~15% |
| Data extraction pipeline | 45% | 62% | ~55% |
The data extraction pipeline had the highest hit rate because the prompts were very structured. The code review pipeline had the lowest because each PR is genuinely unique.
Part 3: The Tricky Parts Nobody Tells You
Temperature Threshold
If you're caching responses from requests with temperature: 0.8, you're going to get weird results when the cached response doesn't match what the user expected. My rule of thumb:
-
temperature <= 0.2: safe to cache, responses are nearly deterministic -
temperature 0.2 - 0.5: cache with caution, shorter TTL (5-10 min) -
temperature > 0.5: don't cache at all
Model Versioning
This bit me hard. I was caching responses from gpt-5.5 and everything was fine for weeks. Then OpenAI silently updated the model behind the same endpoint name. Suddenly my cached responses were subtly different from fresh ones — same model name, different behavior.
Solution: include a model version hash in your cache key if the API provides one. Some APIs include it in the response headers. If not, you can cache a short "probe" prompt daily and hash the response as a version fingerprint.
System Prompt Drift
If your system prompt evolves (and it will), your cached responses become stale. I handle this by including the system prompt hash in the cache key:
def _hash_request(self, model, messages, temperature, **kwargs):
system_msgs = [m for m in messages if m.get("role") == "system"]
user_msgs = [m for m in messages if m.get("role") != "system"]
key_data = {
"model": model,
"system_hash": hashlib.md5(
json.dumps(system_msgs, sort_keys=True).encode()
).hexdigest(),
"messages": user_msgs,
"temperature": round(temperature, 2),
}
raw = json.dumps(key_data, sort_keys=True)
return hashlib.sha256(raw.encode()).hexdigest()[:16]
Streaming Responses
Caching streaming responses is a pain. You can't cache the stream itself easily. What I do:
async def cached_stream(self, model, messages, temperature, **kwargs):
# Check cache first
cached = self.get(model, messages, temperature, **kwargs)
if cached:
# Simulate streaming from cached response
content = cached["choices"][0]["message"]["content"]
for i in range(0, len(content), 20):
yield content[i:i+20]
return
# Cache miss — stream from API and collect full response
full_content = ""
async for chunk in self._stream_from_api(model, messages, temperature):
full_content += chunk
yield chunk
# Store the collected response
synthetic_response = {
"choices": [{"message": {"content": full_content}}],
"usage": {"total_tokens": len(full_content) // 4} # rough estimate
}
self.set(model, messages, temperature, synthetic_response)
Part 4: Production-Ready Middleware
Here's how to wire this into your actual application:
import httpx
from functools import wraps
class CachedLLMClient:
def __init__(self, base_url: str, api_key: str, cache_config: dict = None):
self.client = httpx.AsyncClient(
base_url=base_url,
headers={"Authorization": f"Bearer {api_key}"}
)
config = cache_config or {}
self.cache = SemanticCache(
ttl_seconds=config.get("ttl", 3600),
max_temperature=config.get("max_temp", 0.3),
similarity_threshold=config.get("similarity", 0.92),
)
async def chat_completion(self, model: str, messages: list,
temperature: float = 0.0, **kwargs):
# Check cache
cached = self.cache.get(model, messages, temperature, **kwargs)
if cached:
cached["_cache_hit"] = True
return cached
# Call API
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
**kwargs,
}
resp = await self.client.post("/chat/completions", json=payload)
result = resp.json()
# Cache the response
self.cache.set(model, messages, temperature, result, **kwargs)
result["_cache_hit"] = False
return result
def get_cache_stats(self) -> dict:
total = self.cache.stats["hits"] + self.cache.stats["misses"]
return {
"hit_rate": self.cache.stats["hits"] / total if total else 0,
"total_requests": total,
"estimated_saved_tokens": self.cache.stats["saved_tokens"],
"estimated_saved_usd": self.cache.stats["saved_tokens"] * 0.000003,
}
Usage:
client = CachedLLMClient(
base_url="https://api.xidaoapi.com/v1",
api_key="your-key",
cache_config={"ttl": 7200, "max_temp": 0.2, "similarity": 0.90}
)
# First call — cache miss
result = await client.chat_completion(
model="gpt-5.5",
messages=[{"role": "user", "content": "Summarize: [product data]"}],
temperature=0.0
)
# Same call — cache hit, instant, free
result2 = await client.chat_completion(
model="gpt-5.5",
messages=[{"role": "user", "content": "Summarize: [product data]"}],
temperature=0.0
)
# result2["_cache_hit"] == True
Part 5: Monitoring What You're Saving
Cache hit rates are nice, but what really matters is money saved. Here's a simple tracking approach:
class CacheMetrics:
def __init__(self):
self.daily_savings: dict[str, float] = {}
def record_hit(self, date: str, tokens_saved: int, model: str):
cost_per_token = {
"gpt-5.5": 0.000015,
"claude-opus-4.7": 0.000075,
"deepseek-v4": 0.000001,
"gemini-3.0-pro": 0.000005,
}.get(model, 0.00001)
saved = tokens_saved * cost_per_token
self.daily_savings[date] = self.daily_savings.get(date, 0) + saved
def report(self):
total = sum(self.daily_savings.values())
print(f"Total cached savings: ${total:.2f}")
for date, amount in sorted(self.daily_savings.items()):
print(f" {date}: ${amount:.4f}")
When NOT to Cache
Caching isn't always the right move. Skip it when:
- Real-time data queries: "What's the current stock price of X?" — caching defeats the purpose
- Personalized responses: User-specific context means cached responses leak data
- Creative generation: If you want variety, caching produces the same output every time
- Safety-critical outputs: Medical, legal, or financial advice should always be freshly generated
- A/B testing: You need different responses to compare model performance
What I'd Do Differently
If I were starting over:
- Start with exact-match only — it's simpler and catches more than you'd expect. Add semantic caching later when you have data showing you need it.
- Log everything from day one — I didn't add metrics until week 3 and missed valuable data about which prompts were being repeated.
- Use Redis, not in-memory — My initial prototype used a dict (like the examples above). Moving to Redis meant the cache survived restarts and could be shared across workers.
- Set up cache warming — Pre-populate the cache with common prompts during off-peak hours. This cut my cold-start latency by 60%.
Wrapping Up
A smart caching layer isn't going to solve all your AI cost problems, but in my experience, it's the lowest-effort, highest-impact optimization you can add to an existing pipeline. The exact-match approach alone typically saves 15-30% with about 50 lines of code.
The semantic similarity layer is worth it when you have structured, repetitive prompts — content generation, data extraction, classification tasks. Skip it for creative or highly variable workloads.
What about you? Have you implemented caching for LLM calls in production? What hit rates are you seeing? I'm especially curious whether anyone has tried caching across different models (e.g., using a GPT-5.5 response as a fallback cache for Claude requests) — that's my next experiment.
If you need a gateway that handles caching, routing, and fallback across providers, I've been using XiDao API as my OpenAI-compatible endpoint for the examples above.
Top comments (0)