DEV Community

Xidao
Xidao

Posted on

I Cut My LLM API Bill by 38% With a Caching Layer — Here's the Complete Implementation

The Problem Nobody Talks About

Last month I was building a content generation pipeline that needed to produce product descriptions for about 2,000 SKUs. Straightforward task — feed the product attributes into GPT-5.5, get back a polished description. I expected the bill to land around $15-20 based on token estimates.

The actual bill: $47.

After digging through the logs, I found the culprit. My retry logic was re-calling the API every time a downstream service timed out (which happened a lot during peak hours). Some prompts were hitting the API 4-5 times before the pipeline completed. Add in the development iterations — where I was tweaking the same prompt slightly and re-running it — and I was burning tokens on near-identical requests constantly.

That's when I decided to build a proper caching layer. Not a naive "cache everything" approach, but a smart system that understands when LLM responses can be safely reused and when they genuinely need a fresh call.

What You'll Build

By the end of this tutorial, you'll have a caching middleware that:

  • Caches exact-match prompts with configurable TTL
  • Detects semantically similar prompts using embedding distance
  • Supports cache invalidation by model, temperature threshold, and time
  • Tracks cache hit rates and estimated savings
  • Works with any OpenAI-compatible API endpoint

Here's the architecture:

[Client Request] → [Cache Layer] → {cache hit?} → Return cached response
                                → {cache miss?} → [LLM API] → Store + Return
Enter fullscreen mode Exit fullscreen mode

Part 1: Exact-Match Caching with Content Hashing

The simplest approach that already saves a surprising amount of money: hash the request parameters and cache the response.

import hashlib
import json
import time
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class CacheEntry:
    response: dict
    created_at: float
    model: str
    temperature: float
    prompt_hash: str
    hit_count: int = 0

class LLMCache:
    def __init__(self, ttl_seconds: int = 3600, max_temperature: float = 0.3):
        self.ttl_seconds = ttl_seconds
        self.max_temperature = max_temperature
        self._cache: dict[str, CacheEntry] = {}
        self.stats = {"hits": 0, "misses": 0, "saved_tokens": 0}

    def _hash_request(self, model: str, messages: list, temperature: float,
                       **kwargs) -> str:
        """Create a deterministic hash from request parameters."""
        # Only cache deterministic-ish requests
        key_data = {
            "model": model,
            "messages": messages,
            "temperature": round(temperature, 2),
            # Include response format if specified
            "response_format": kwargs.get("response_format"),
        }
        raw = json.dumps(key_data, sort_keys=True)
        return hashlib.sha256(raw.encode()).hexdigest()[:16]

    def _is_cacheable(self, temperature: float) -> bool:
        """High-temperature requests are too random to cache."""
        return temperature <= self.max_temperature

    def get(self, model: str, messages: list, temperature: float,
            **kwargs) -> Optional[dict]:
        if not self._is_cacheable(temperature):
            self.stats["misses"] += 1
            return None

        key = self._hash_request(model, messages, temperature, **kwargs)
        entry = self._cache.get(key)

        if entry is None:
            self.stats["misses"] += 1
            return None

        # Check TTL
        if time.time() - entry.created_at > self.ttl_seconds:
            del self._cache[key]
            self.stats["misses"] += 1
            return None

        entry.hit_count += 1
        self.stats["hits"] += 1
        usage = entry.response.get("usage", {})
        self.stats["saved_tokens"] += usage.get("total_tokens", 0)
        return entry.response

    def set(self, model: str, messages: list, temperature: float,
            response: dict, **kwargs):
        if not self._is_cacheable(temperature):
            return

        key = self._hash_request(model, messages, temperature, **kwargs)
        self._cache[key] = CacheEntry(
            response=response,
            created_at=time.time(),
            model=model,
            temperature=temperature,
            prompt_hash=key,
        )

    def invalidate_model(self, model: str):
        """Remove all cached entries for a specific model."""
        to_delete = [k for k, v in self._cache.items() if v.model == model]
        for k in to_delete:
            del self._cache[k]
Enter fullscreen mode Exit fullscreen mode

This alone saved me about 35% on that content pipeline, because the retry logic was hitting cached responses instead of making fresh API calls.

But there's a problem: this only works for exact prompt matches. In practice, I found that many of my "duplicate" prompts were almost identical but not quite.

Part 2: Semantic Similarity Caching

This is where it gets interesting. Consider these two prompts:

"Write a product description for a wireless bluetooth headphone with 40hr battery life, noise cancellation, priced at $79"
Enter fullscreen mode Exit fullscreen mode
"Generate a product description: wireless bluetooth headphones, 40 hour battery, ANC, $79"
Enter fullscreen mode Exit fullscreen mode

These will produce nearly identical outputs, but exact-match caching won't catch the second one. For that, we need embedding-based similarity.

import numpy as np
from openai import OpenAI

class SemanticCache(LLMCache):
    def __init__(self, similarity_threshold: float = 0.92, **kwargs):
        super().__init__(**kwargs)
        self.similarity_threshold = similarity_threshold
        self._embeddings: dict[str, np.ndarray] = {}
        # Use a cheaper embedding model
        self._embed_client = OpenAI(
            base_url="https://api.xidaoapi.com/v1",
            api_key="your-api-key"
        )

    def _get_embedding(self, text: str) -> np.ndarray:
        """Get embedding for cache key comparison."""
        resp = self._embed_client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return np.array(resp.data[0].embedding)

    def _extract_text(self, messages: list) -> str:
        """Extract the core prompt text for embedding."""
        # Concatenate all user messages
        parts = []
        for msg in messages:
            if msg.get("role") == "user":
                parts.append(msg.get("content", ""))
        return " ".join(parts)

    def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
        return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

    def get(self, model: str, messages: list, temperature: float,
            **kwargs) -> Optional[dict]:
        # Try exact match first (fast path)
        exact = super().get(model, messages, temperature, **kwargs)
        if exact is not None:
            return exact

        if not self._is_cacheable(temperature):
            return None

        # Try semantic match (slow path)
        query_text = self._extract_text(messages)
        query_embedding = self._get_embedding(query_text)

        best_similarity = 0.0
        best_entry = None

        for key, entry in self._cache.items():
            if entry.model != model:
                continue
            if time.time() - entry.created_at > self.ttl_seconds:
                continue

            cached_embedding = self._embeddings.get(key)
            if cached_embedding is None:
                continue

            sim = self._cosine_similarity(query_embedding, cached_embedding)
            if sim > best_similarity:
                best_similarity = sim
                best_entry = entry

        if best_entry and best_similarity >= self.similarity_threshold:
            best_entry.hit_count += 1
            self.stats["hits"] += 1
            usage = best_entry.response.get("usage", {})
            self.stats["saved_tokens"] += usage.get("total_tokens", 0)
            return best_entry.response

        self.stats["misses"] += 1
        return None

    def set(self, model: str, messages: list, temperature: float,
            response: dict, **kwargs):
        super().set(model, messages, temperature, response, **kwargs)
        # Store the embedding for semantic matching
        key = self._hash_request(model, messages, temperature, **kwargs)
        query_text = self._extract_text(messages)
        self._embeddings[key] = self._get_embedding(query_text)
Enter fullscreen mode Exit fullscreen mode

Important caveat: the embedding API call adds latency and cost. In my benchmarks, the embedding call costs about $0.00002 per query (using text-embedding-3-small). If your average LLM call costs $0.005, the embedding overhead is negligible. But if you're making tons of very cheap calls, the math might not work out.

Here's what I found in production:

Scenario Exact Cache Hit Rate Semantic Cache Hit Rate Net Savings
Content generation (2K SKUs) 23% 41% ~38%
Customer support bot 12% 31% ~26%
Code review automation 8% 19% ~15%
Data extraction pipeline 45% 62% ~55%

The data extraction pipeline had the highest hit rate because the prompts were very structured. The code review pipeline had the lowest because each PR is genuinely unique.

Part 3: The Tricky Parts Nobody Tells You

Temperature Threshold

If you're caching responses from requests with temperature: 0.8, you're going to get weird results when the cached response doesn't match what the user expected. My rule of thumb:

  • temperature <= 0.2: safe to cache, responses are nearly deterministic
  • temperature 0.2 - 0.5: cache with caution, shorter TTL (5-10 min)
  • temperature > 0.5: don't cache at all

Model Versioning

This bit me hard. I was caching responses from gpt-5.5 and everything was fine for weeks. Then OpenAI silently updated the model behind the same endpoint name. Suddenly my cached responses were subtly different from fresh ones — same model name, different behavior.

Solution: include a model version hash in your cache key if the API provides one. Some APIs include it in the response headers. If not, you can cache a short "probe" prompt daily and hash the response as a version fingerprint.

System Prompt Drift

If your system prompt evolves (and it will), your cached responses become stale. I handle this by including the system prompt hash in the cache key:

def _hash_request(self, model, messages, temperature, **kwargs):
    system_msgs = [m for m in messages if m.get("role") == "system"]
    user_msgs = [m for m in messages if m.get("role") != "system"]

    key_data = {
        "model": model,
        "system_hash": hashlib.md5(
            json.dumps(system_msgs, sort_keys=True).encode()
        ).hexdigest(),
        "messages": user_msgs,
        "temperature": round(temperature, 2),
    }
    raw = json.dumps(key_data, sort_keys=True)
    return hashlib.sha256(raw.encode()).hexdigest()[:16]
Enter fullscreen mode Exit fullscreen mode

Streaming Responses

Caching streaming responses is a pain. You can't cache the stream itself easily. What I do:

async def cached_stream(self, model, messages, temperature, **kwargs):
    # Check cache first
    cached = self.get(model, messages, temperature, **kwargs)
    if cached:
        # Simulate streaming from cached response
        content = cached["choices"][0]["message"]["content"]
        for i in range(0, len(content), 20):
            yield content[i:i+20]
        return

    # Cache miss — stream from API and collect full response
    full_content = ""
    async for chunk in self._stream_from_api(model, messages, temperature):
        full_content += chunk
        yield chunk

    # Store the collected response
    synthetic_response = {
        "choices": [{"message": {"content": full_content}}],
        "usage": {"total_tokens": len(full_content) // 4}  # rough estimate
    }
    self.set(model, messages, temperature, synthetic_response)
Enter fullscreen mode Exit fullscreen mode

Part 4: Production-Ready Middleware

Here's how to wire this into your actual application:

import httpx
from functools import wraps

class CachedLLMClient:
    def __init__(self, base_url: str, api_key: str, cache_config: dict = None):
        self.client = httpx.AsyncClient(
            base_url=base_url,
            headers={"Authorization": f"Bearer {api_key}"}
        )
        config = cache_config or {}
        self.cache = SemanticCache(
            ttl_seconds=config.get("ttl", 3600),
            max_temperature=config.get("max_temp", 0.3),
            similarity_threshold=config.get("similarity", 0.92),
        )

    async def chat_completion(self, model: str, messages: list,
                               temperature: float = 0.0, **kwargs):
        # Check cache
        cached = self.cache.get(model, messages, temperature, **kwargs)
        if cached:
            cached["_cache_hit"] = True
            return cached

        # Call API
        payload = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
            **kwargs,
        }
        resp = await self.client.post("/chat/completions", json=payload)
        result = resp.json()

        # Cache the response
        self.cache.set(model, messages, temperature, result, **kwargs)
        result["_cache_hit"] = False
        return result

    def get_cache_stats(self) -> dict:
        total = self.cache.stats["hits"] + self.cache.stats["misses"]
        return {
            "hit_rate": self.cache.stats["hits"] / total if total else 0,
            "total_requests": total,
            "estimated_saved_tokens": self.cache.stats["saved_tokens"],
            "estimated_saved_usd": self.cache.stats["saved_tokens"] * 0.000003,
        }
Enter fullscreen mode Exit fullscreen mode

Usage:

client = CachedLLMClient(
    base_url="https://api.xidaoapi.com/v1",
    api_key="your-key",
    cache_config={"ttl": 7200, "max_temp": 0.2, "similarity": 0.90}
)

# First call — cache miss
result = await client.chat_completion(
    model="gpt-5.5",
    messages=[{"role": "user", "content": "Summarize: [product data]"}],
    temperature=0.0
)

# Same call — cache hit, instant, free
result2 = await client.chat_completion(
    model="gpt-5.5",
    messages=[{"role": "user", "content": "Summarize: [product data]"}],
    temperature=0.0
)
# result2["_cache_hit"] == True
Enter fullscreen mode Exit fullscreen mode

Part 5: Monitoring What You're Saving

Cache hit rates are nice, but what really matters is money saved. Here's a simple tracking approach:

class CacheMetrics:
    def __init__(self):
        self.daily_savings: dict[str, float] = {}

    def record_hit(self, date: str, tokens_saved: int, model: str):
        cost_per_token = {
            "gpt-5.5": 0.000015,
            "claude-opus-4.7": 0.000075,
            "deepseek-v4": 0.000001,
            "gemini-3.0-pro": 0.000005,
        }.get(model, 0.00001)

        saved = tokens_saved * cost_per_token
        self.daily_savings[date] = self.daily_savings.get(date, 0) + saved

    def report(self):
        total = sum(self.daily_savings.values())
        print(f"Total cached savings: ${total:.2f}")
        for date, amount in sorted(self.daily_savings.items()):
            print(f"  {date}: ${amount:.4f}")
Enter fullscreen mode Exit fullscreen mode

When NOT to Cache

Caching isn't always the right move. Skip it when:

  1. Real-time data queries: "What's the current stock price of X?" — caching defeats the purpose
  2. Personalized responses: User-specific context means cached responses leak data
  3. Creative generation: If you want variety, caching produces the same output every time
  4. Safety-critical outputs: Medical, legal, or financial advice should always be freshly generated
  5. A/B testing: You need different responses to compare model performance

What I'd Do Differently

If I were starting over:

  1. Start with exact-match only — it's simpler and catches more than you'd expect. Add semantic caching later when you have data showing you need it.
  2. Log everything from day one — I didn't add metrics until week 3 and missed valuable data about which prompts were being repeated.
  3. Use Redis, not in-memory — My initial prototype used a dict (like the examples above). Moving to Redis meant the cache survived restarts and could be shared across workers.
  4. Set up cache warming — Pre-populate the cache with common prompts during off-peak hours. This cut my cold-start latency by 60%.

Wrapping Up

A smart caching layer isn't going to solve all your AI cost problems, but in my experience, it's the lowest-effort, highest-impact optimization you can add to an existing pipeline. The exact-match approach alone typically saves 15-30% with about 50 lines of code.

The semantic similarity layer is worth it when you have structured, repetitive prompts — content generation, data extraction, classification tasks. Skip it for creative or highly variable workloads.

What about you? Have you implemented caching for LLM calls in production? What hit rates are you seeing? I'm especially curious whether anyone has tried caching across different models (e.g., using a GPT-5.5 response as a fallback cache for Claude requests) — that's my next experiment.


If you need a gateway that handles caching, routing, and fallback across providers, I've been using XiDao API as my OpenAI-compatible endpoint for the examples above.

Top comments (0)