Most agent memory implementations have one thing in common: they don't have one. Here's what a real memory architecture looks like.
The Default (Wrong) Approach
Nine out of ten agent implementations handle memory the same way:
messages = [] # The "memory system"
while True:
messages.append({"role": "user", "content": user_input})
response = llm.complete(messages)
messages.append({"role": "assistant", "content": response})
This works fine — until it doesn't. After 20-30 turns, you hit the context limit. Or you restart the process. Or the user comes back three days later. Gone. All of it.
The context window isn't memory. It's working RAM. And you wouldn't run your OS entirely from RAM.
The Four Memory Tiers
Production agents need four kinds of memory, each with different storage backends, retrieval patterns, and lifetimes:
@dataclass
class MemoryTier:
name: str
storage_backend: str
max_items: Optional[int]
ttl_seconds: Optional[int]
retrieval_method: str
TIERS = [
MemoryTier("working", "context", max_items=20, ttl_seconds=None, retrieval_method="sequential"),
MemoryTier("episodic", "redis", max_items=1000, ttl_seconds=86400, retrieval_method="recency"),
MemoryTier("semantic", "vector_db", max_items=None, ttl_seconds=None, retrieval_method="semantic"),
MemoryTier("procedural", "postgres", max_items=None, ttl_seconds=None, retrieval_method="key_lookup"),
]
Working memory is context you need right now. Current task, recent tool results, active decisions. Cap it at 20 items. When it fills up, summarize.
Episodic memory is what happened in this session (or recent sessions). Redis with a 24h TTL. Retrieved by recency, not relevance.
Semantic memory is knowledge your agent has learned or been told. Vector store. Retrieved by similarity to the current query. Never expires — you decide what's worth keeping.
Procedural memory is how to do things. Proven workflows, successful patterns, learned skills. SQLite or Postgres. Retrieved by key lookup. Changes slowly.
Working Memory: The Compression Problem
The most immediate pain point is context overflow. The fix: compress aggressively using a cheap model.
class WorkingMemoryManager:
def __init__(self, max_tokens=8000, summarize_at=0.80, keep_recent=5):
self.max_tokens = max_tokens
self.summarize_threshold = int(max_tokens * summarize_at)
self.keep_recent = keep_recent
self.items = []
self.summaries = []
def add(self, item: dict, tokens: int):
self.items.append({**item, "_tokens": tokens})
if self.current_tokens > self.summarize_threshold:
self._compress()
def _compress(self):
if len(self.items) <= self.keep_recent:
return
to_summarize = self.items[:-self.keep_recent]
preserved = self.items[-self.keep_recent:]
# Use Haiku/Flash for compression — fast and cheap
summary = llm_summarize(to_summarize, model="claude-haiku-4-5")
self.summaries.append(summary)
self.items = preserved
@property
def current_tokens(self):
return sum(i["_tokens"] for i in self.items)
The key: use your cheapest model (Haiku, Flash, Mini) for compression. The compression task is simple. You don't need GPT-4 to summarize a list of tool results. This costs fractions of a cent per compression while keeping your main context lean.
The threshold: 80% is a good starting point. Too high (95%) and you're constantly scrambling. Too low (60%) and you're over-compressing and losing context.
Episodic Memory: Session Continuity with Redis
class EpisodicMemory:
def __init__(self, redis_url: str, ttl: int = 86400):
self.r = redis.from_url(redis_url)
self.ttl = ttl
def store_episode(self, session_id: str, episode: dict):
key = f"episodes:{session_id}"
self.r.rpush(key, json.dumps({
**episode,
"timestamp": datetime.utcnow().isoformat()
}))
self.r.expire(key, self.ttl)
def get_recent(self, session_id: str, n: int = 10) -> list[dict]:
return [
json.loads(e)
for e in self.r.lrange(f"episodes:{session_id}", -n, -1)
]
Simple, but effective. Each turn is stored as an episode. Sessions expire after 24h by default. When a user returns, you can load recent episodes to restore context.
What to store as an episode:
- Every turn (user message + agent response)
- Every tool call + result (especially failures)
- Every decision point
- Errors and their resolutions
What NOT to store: verbose raw tool outputs (store the extracted insight instead), intermediate reasoning steps, duplicate information.
Semantic Memory: What Your Agent Actually Knows
For knowledge that needs to persist across sessions and be retrieved by relevance:
from sentence_transformers import SentenceTransformer
import chromadb
class SemanticMemory:
def __init__(self, collection_name: str = "agent_knowledge"):
self.client = chromadb.PersistentClient(path="./chroma_db")
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
self.encoder = SentenceTransformer("all-MiniLM-L6-v2")
def store(self, content: str, metadata: dict) -> str:
doc_id = hashlib.md5(content.encode()).hexdigest()[:12]
embedding = self.encoder.encode(content).tolist()
self.collection.upsert(
ids=[doc_id],
documents=[content],
embeddings=[embedding],
metadatas=[metadata]
)
return doc_id
def retrieve(self, query: str, n: int = 5, min_relevance: float = 0.6) -> list[dict]:
query_embedding = self.encoder.encode(query).tolist()
results = self.collection.query(query_embeddings=[query_embedding], n_results=n)
memories = []
for doc, meta, distance in zip(
results["documents"][0], results["metadatas"][0], results["distances"][0]
):
similarity = 1 - distance
if similarity >= min_relevance:
memories.append({"content": doc, "metadata": meta, "relevance": round(similarity, 3)})
return sorted(memories, key=lambda x: x["relevance"], reverse=True)
Critical rule: freeze your embedding model. Once you have data embedded with all-MiniLM-L6-v2, every query must also use all-MiniLM-L6-v2. Changing models invalidates all your stored embeddings. Choose once.
The 0.6 threshold: Results below 60% cosine similarity are usually noise. Tune this on your specific use case — domain-specific agents might need to go down to 0.5; general-purpose agents often do better at 0.65-0.7.
The Checkpoint Pattern: Surviving Crashes
Long-running agents — anything that takes more than a few minutes — need checkpointing.
class AgentStateManager:
def __init__(self, redis_url: str):
self.r = redis.from_url(redis_url)
def checkpoint(self, task_id: str, state: dict):
"""Call after every significant step."""
self.r.setex(
f"checkpoint:{task_id}",
3600, # 1h TTL; refresh on each checkpoint
json.dumps({
**state,
"checkpoint_time": datetime.utcnow().isoformat(),
})
)
def restore(self, task_id: str) -> Optional[dict]:
data = self.r.get(f"checkpoint:{task_id}")
return json.loads(data) if data else None
# In your agent loop:
class ResumableAgent:
async def run(self, task_id: str, steps: list):
state = self.state_manager.restore(task_id) or {"completed": [], "results": {}}
completed = set(state["completed"])
for i, step in enumerate(steps):
step_name = f"step_{i}_{step.__name__}"
if step_name in completed:
continue # Skip already-completed steps (idempotency)
result = await step(state)
state["results"][step_name] = result
state["completed"].append(step_name)
self.state_manager.checkpoint(task_id, state) # Save after each step
The idempotency requirement: Every step must be safely re-runnable. If your agent crashes during step 5 and restarts, it will re-run step 5. If step 5 is "send confirmation email" and you run it twice, you have a problem. Design for re-execution.
The Unified Interface
Once you have all four tiers, expose them through a single recall() method. Your agent shouldn't need to decide which tier to query:
class AgentMemory:
async def recall(self, query: str, n: int = 5) -> str:
results = []
# Working memory: always include
working = self.working.get_context_block()
if working:
results.append(f"## Current session\n{working}")
# Episodic: recent episodes
recent = self.episodic.get_recent(self.session_id, n=3)
if recent:
ep = "\n".join([f"- {e.get('content', '')}" for e in recent])
results.append(f"## Recent context\n{ep}")
# Semantic: relevant knowledge
semantic = self.semantic.retrieve(query, n=n)
if semantic:
sem = "\n".join([f"- [{m['relevance']:.2f}] {m['content'][:200]}" for m in semantic])
results.append(f"## Relevant knowledge\n{sem}")
return "\n\n---\n\n".join(results)
Inject the output of recall() into your system prompt or the first user message. The agent gets relevant context from all tiers without knowing which storage system it came from.
What This Looks Like in Practice
A well-designed memory system is nearly invisible during normal operation. Turns flow through working memory. Sessions persist in episodic. Knowledge accumulates in semantic.
The system shows its value at the edges:
- User returns after 3 days: episodic memory loads recent context, semantic memory surfaces relevant knowledge
- Long task crashes at step 18 of 25: checkpoint restores, resumes from step 18, not from 0
- User says "remember that I prefer X" in session 1: semantic store. Referenced correctly in session 47.
Three Things Most Agents Get Wrong
Using the main model for compression. Haiku/Flash is fine for compression tasks. The main model should be reserved for reasoning.
Storing too much in semantic memory. Not every fact deserves long-term storage. If it's time-sensitive or ephemeral, put it in episodic (with TTL), not semantic (permanent).
Skipping idempotency. Checkpointing is only safe if your steps are idempotent. State-mutating steps that can't be safely re-run need explicit "completed" tracking before the state mutation.
If you want the full implementations — complete WorkingMemoryManager with compression, EpisodicMemory with importance scoring, SemanticMemory with GDPR delete, ProceduralMemory with success-rate tracking, ResumableAgent with crash recovery, AgentMemory unified interface, lifecycle manager, and the 35-point checklist — it's packaged at Machina Market (MAC-017, 0.016 ETH).
Questions on specific implementation details? Drop them in the comments.
Tags: #ai #python #agents #architecture #memory
Top comments (0)