Building a resilient microservice cache with optimistic locking and event-driven invalidation
Building a resilient microservice cache with optimistic locking and event-driven invalidation
Caching is essential for performance in distributed systems, but it becomes tricky when you have multiple services mutating shared data. This tutorial walks you through designing and implementing a resilient, scalable cache layer for a microservice architecture. You’ll learn how to combine optimistic locking, versioned cache entries, and event-driven invalidation to keep cache correctness high without sacrificing throughput.
What you’ll build
- A simple user profile service with a read-heavy cache path
- A cache store that supports optimistic locking using version stamps
- An event bus (simulated) to propagate invalidation events
- A small service that consumes events to invalidate or refresh cached data
- A tested, runnable example in Python, with optional TypeScript typings for a frontend consumer
Prerequisites
- Basic familiarity with HTTP APIs, RESTful design, and eventual consistency concepts
- Understanding of caching (TTL, eviction) and cache coherence challenges
- Python 3.8+ or your preferred modern runtime
- Optional: Node.js for a frontend example
High-level design
- Read path: check cache for a recent version tag. If present and fresh, return cached data. If not, fetch from the source, validate that the data hasn’t changed (via a version field), and update the cache with a new version.
- Write path: when mutating data, perform an atomic update with a version increment. If the cache entry’s version is stale, reject or retry with fresh data.
- Invalidation path: when a write occurs, publish an invalidation event with the affected keys and new versions. Listeners refresh or clear stale cache entries.
- Concurrency control: use optimistic locking via a version field in the data model. This minimizes lock contention while preserving correctness.
- Consistency model: read-through cache with versioned entries; eventual consistency guaranteed via invalidation events.
Code overview
- cache.py: a minimal in-memory cache with version stamping and optimistic checks
- store.py: the canonical data store (simulated in-memory) with versioned records
- event_bus.py: a simple publish/subscribe mechanism to simulate events
- service.py: the user profile service exposing get and update endpoints with cache integration
- test_cli.py: simple tests to exercise read/write paths and invalidation
Step 1: The data model and store
- Data model includes id, name, email, and version (integer)
- The store supports get_by_id, update_with_optimistic_lock, and create
- Version increments on each successful update
Python implementation (store.py)
- A thread-safe in-memory store using a dictionary and a Lock
- get returns a snapshot of the data along with its version
- update_with_optimistic_lock(id, expected_version, new_data) verifies version before applying
Code (store.py)
- from threading import Lock
- from typing import Optional, Dict, Any
class DataStore:
def init(self):
self._lock = Lock()
self._store: Dict[str, Dict[str, Any]] = {}
def create(self, id: str, data: Dict[str, Any]) -> Dict[str, Any]:
with self._lock:
if id in self._store:
raise ValueError("Item already exists")
item = data.copy()
item["version"] = 1
self._store[id] = item
return item.copy()
def get_by_id(self, id: str) -> Optional[Dict[str, Any]]:
with self._lock:
item = self._store.get(id)
return item.copy() if item else None
def update_with_optimistic_lock(self, id: str, expected_version: int, new_data: Dict[str, Any]) -> Dict[str, Any]:
with self._lock:
if id not in self._store:
raise KeyError("Item not found")
current = self._store[id]
if current["version"] != expected_version:
raise ValueError("Version mismatch")
# Apply updates
updated = current.copy()
updated.update(new_data)
updated["version"] = current["version"] + 1
self._store[id] = updated
return updated.copy()
Step 2: The cache with optimistic locking
- Cache entries store data and version
- get: if entry present and version matches data version, return data; otherwise fetch from store
- set: store data with its version
- invalidate: clear or refresh on event
Code (cache.py)
- from typing import Optional, Dict, Any
- from time import time
class CacheEntry:
def init(self, value: Dict[str, Any], expires_at: float):
self.value = value
self.expires_at = expires_at
class VersionedCache:
def init(self, ttl_seconds: float = 60.0):
self._ttl = ttl_seconds
self._store: Dict[str, CacheEntry] = {}
def get(self, key: str) -> Optional[Dict[str, Any]]:
ent = self._store.get(key)
if not ent:
return None
if ent.expires_at < time():
del self._store[key]
return None
return ent.value.copy()
def set(self, key: str, value: Dict[str, Any]) -> None:
expires_at = time() + self._ttl
self._store[key] = CacheEntry(value.copy(), expires_at)
def invalidate(self, key: str) -> None:
if key in self._store:
del self._store[key]
Step 3: The event bus
- Lightweight pub/sub to propagate invalidation events
- Event includes type ("invalidate") and payload (key)
Code (event_bus.py)
- from typing import Callable, Dict, List
- class EventBus:
- def init(self):
self._subscribers: Dict[str, List[Callable[[Dict[str, Any]], None]]] = {}def publish(self, event_type: str, payload: Dict[str, Any]) -> None:for cb in self._subscribers.get(event_type, []):cb(payload)def subscribe(self, event_type: str, callback: Callable[[Dict[str, Any]], None]) -> None:self._subscribers.setdefault(event_type, []).append(callback)
Step 4: The service layer
- get_profile(id): check cache; if miss, fetch from store, cache it
- update_profile(id, partial): perform optimistic update on store; on success, publish invalidation event for that id
- A simple CLI HTTP-like interface is optional; here we’ll use direct function calls to keep focused
Code (service.py)
- from store import DataStore
- from cache import VersionedCache
- from event_bus import EventBus
class UserService:
def init(self, store: DataStore, cache: VersionedCache, bus: EventBus, ttl: float = 60.0):
self.store = store
self.cache = cache
self.bus = bus
self.ttl = ttl
self.bus.subscribe("invalidate_profile", self._handle_invalidation)
def _handle_invalidation(self, payload: Dict[str, Any]) -> None:
key = payload["id"]
self.cache.invalidate(key)
def get_profile(self, id: str) -> Dict[str, Any]:
cached = self.cache.get(id)
if cached:
return cached
# fetch from store
item = self.store.get_by_id(id)
if item is None:
raise KeyError("Profile not found")
self.cache.set(id, item)
return item
def update_profile(self, id: str, updates: Dict[str, Any]) -> Dict[str, Any]:
# First, fetch current version to use as optimistic lock
current = self.store.get_by_id(id)
if current is None:
raise KeyError("Profile not found")
expected_version = current["version"]
updated = self.store.update_with_optimistic_lock(id, expected_version, updates)
# Publish invalidation so other caches refresh
self.bus.publish("invalidate_profile", {"id": id})
return updated
Step 5: Wiring it together (example usage)
- Initialize store, cache, and event bus
- Create a sample profile
- Retrieve to populate cache
- Update profile, observe cache invalidation and refresh on next read
Example runner (main.py)
- from store import DataStore
- from cache import VersionedCache
- from event_bus import EventBus
- from service import UserService
def main():
store = DataStore()
bus = EventBus()
cache = VersionedCache(ttl_seconds=30)
svc = UserService(store, cache, bus)
# Create initial data
store.create("user-123", {"id": "user-123", "name": "Alex Doe", "email": "alex@example.com"})
# Read (populates cache)
print("Initial read:", svc.get_profile("user-123"))
# Read again (from cache)
print("Cached read:", svc.get_profile("user-123"))
# Update
updated = svc.update_profile("user-123", {"name": "Alexandra Doe"})
print("Updated:", updated)
# Read after update (cache should have been invalidated by event)
print("After invalidation read:", svc.get_profile("user-123"))
if name == "main":
main()
Step 6: Testing the flow
- Test scenarios:
- Cache miss followed by successful store fetch
- Concurrent-style update with version mismatch (simulate by mutating store outside cache)
- Invalidation propagation triggers a refresh on next read
- Simple pytest-style tests can be added to verify:
- Version increments on update
- Cache invalidation occurs after update
- Retrieval path prefers cache when valid
Illustration: a quick mental model
- Imagine your data as a library book copy with a version tag. The cache holds a stamped copy. If someone else updates the master copy, the stamp changes. The cache checks the stamp before using a copy. If the stamp mismatches, it fetches a fresh copy and updates its stamp. Invalidation events are like a librarian shouting “Schnell, replace this copy!” to all caches.
Practical tips and gotchas
- Use a reliable store versioning scheme: ensure every write increments a monotonically increasing version to avoid stale reads.
- Prefer read-through with explicit invalidation: it reduces the chance of stale reads while keeping write throughput high.
- Tune TTL: longer TTL reduces cache churn but increases chances of slightly stale reads; shorter TTL increases freshness but more load on the store.
- If you have multiple cache layers (edge, service, database), propagate invalidation across layers to minimize stale data windows.
- Monitor cache miss rate and invalidation events to detect synchronization problems early.
Extensions you can add
- Distributed cache backend (Redis, Memcached) with native Lua scripts for atomic operations and version checks
- Event-driven architecture using a real message broker (Kafka, NATS) for cross-service invalidation
- Frontend integration: a small TypeScript client that uses the same version semantics to decide when to refetch data
Sample TypeScript snippet for a frontend consumer (optional)
- It calls an API to get profile data, which returns { id, name, email, version }
- On cache miss, it fetches from API and caches with the provided version
- On receiving a cache invalidation event via WebSocket or SSE, it invalidates local cache and fetches fresh data
TypeScript sketch (frontend-cache.ts)
type Profile = { id: string; name: string; email: string; version: number }
class FrontendCache {
private store: Map<string, Profile> = new Map()private apiBase = "/api/profile"async getProfile(id: string): Promise<Profile> {const cached = this.store.get(id)if (cached) return cachedconst fresh = await fetch(`${this.apiBase}/${id}`).then(r => r.json())this.store.set(id, fresh)return fresh}invalidate(id: string): void {this.store.delete(id)}}
// WebSocket or SSE listener would call invalidate on relevant IDs
Conclusion
This tutorial presented a practical pattern for building a resilient microservice cache using optimistic locking and event-driven invalidation. The core ideas-versioned cache entries, a simple pub/sub invalidation mechanism, and a read-through cache path-help maintain correctness without sacrificing performance in a distributed setting. The provided Python scaffolding is a solid starting point; you can swap in real-world components (Redis, Kafka, gRPC) as needed to scale to production workloads.
Would you like me to adapt this into a runnable Git repository with tests and a docker-compose setup, or tailor the example to a specific tech stack you’re using (e.g., Python FastAPI with Redis, or Node.js with Redis and Kafka)?
-
Rizwan Saleem | https://rizwansaleem.co
Top comments (0)