8 AI Agent Memory Patterns for Production Systems (Beyond Basic RAG)
Every AI agent tutorial shows stateless request-response. User asks, agent answers, context vanishes.
Real agents need memory. Not just "stuff the last 10 messages into the prompt" — actual structured memory that persists, compresses, and retrieves intelligently.
Here are 8 memory patterns we use in production, ranked from simplest to most sophisticated.
1. Sliding Window with Smart Summarization
The baseline. Keep recent messages, summarize old ones. But do it properly.
# memory/sliding_window.py
from dataclasses import dataclass, field
from datetime import datetime
import json
@dataclass
class Message:
role: str # "user", "assistant", "system", "tool"
content: str
timestamp: datetime = field(default_factory=datetime.utcnow)
token_count: int = 0
metadata: dict = field(default_factory=dict)
class SlidingWindowMemory:
"""Maintains a context window with automatic summarization."""
def __init__(
self,
max_tokens: int = 8000,
summarize_threshold: float = 0.8,
summary_model: str = "claude-3-5-haiku-20241022",
):
self.max_tokens = max_tokens
self.summarize_threshold = summarize_threshold
self.summary_model = summary_model
self.messages: list[Message] = []
self.summary: str = ""
self.total_tokens: int = 0
self._summaries_created: int = 0
def add(self, message: Message) -> None:
message.token_count = self._estimate_tokens(message.content)
self.messages.append(message)
self.total_tokens += message.token_count
if self.total_tokens > self.max_tokens * self.summarize_threshold:
self._compress()
def get_context(self) -> list[dict]:
"""Return messages formatted for LLM consumption."""
context = []
if self.summary:
context.append({
"role": "system",
"content": f"Conversation summary so far:\n{self.summary}",
})
for msg in self.messages:
context.append({"role": msg.role, "content": msg.content})
return context
def _compress(self) -> None:
"""Summarize the oldest half of messages."""
if len(self.messages) < 4:
return
split_point = len(self.messages) // 2
old_messages = self.messages[:split_point]
self.messages = self.messages[split_point:]
# Build summary prompt
old_text = "\n".join(
f"{m.role}: {m.content[:500]}" for m in old_messages
)
new_summary = self._call_summarizer(
f"Previous summary: {self.summary}\n\n"
f"New messages to incorporate:\n{old_text}\n\n"
f"Create a concise summary preserving: key decisions, "
f"user preferences, task progress, and important facts."
)
self.summary = new_summary
self.total_tokens = sum(m.token_count for m in self.messages)
self.total_tokens += self._estimate_tokens(self.summary)
self._summaries_created += 1
def _call_summarizer(self, prompt: str) -> str:
"""Call a fast model for summarization."""
import anthropic
client = anthropic.Anthropic()
response = client.messages.create(
model=self.summary_model,
max_tokens=500,
messages=[{"role": "user", "content": prompt}],
)
return response.content[0].text
def _estimate_tokens(self, text: str) -> int:
return len(text) // 4 # Rough estimate
def stats(self) -> dict:
return {
"messages": len(self.messages),
"total_tokens": self.total_tokens,
"has_summary": bool(self.summary),
"compressions": self._summaries_created,
}
When to use: Every agent needs this as a baseline. The key insight: use a fast, cheap model (Haiku) for summarization, not your main model.
2. Semantic Memory with Vector Search
Store facts and retrieve them by meaning, not just recency.
# memory/semantic.py
import numpy as np
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import json
import hashlib
@dataclass
class MemoryEntry:
id: str
content: str
embedding: list[float]
category: str # "fact", "preference", "decision", "event"
importance: float = 0.5 # 0-1 scale
created_at: datetime = field(default_factory=datetime.utcnow)
last_accessed: datetime = field(default_factory=datetime.utcnow)
access_count: int = 0
source: str = "" # Which conversation created this
metadata: dict = field(default_factory=dict)
class SemanticMemory:
"""Vector-based long-term memory for AI agents."""
def __init__(self, embedding_model: str = "text-embedding-3-small"):
self.embedding_model = embedding_model
self.entries: dict[str, MemoryEntry] = {}
self._embedding_cache: dict[str, list[float]] = {}
async def store(
self,
content: str,
category: str = "fact",
importance: float = 0.5,
source: str = "",
metadata: dict | None = None,
) -> str:
"""Store a memory with its embedding."""
entry_id = hashlib.sha256(content.encode()).hexdigest()[:16]
# Check for near-duplicates
embedding = await self._embed(content)
duplicate = self._find_duplicate(embedding, threshold=0.95)
if duplicate:
# Update existing instead of creating duplicate
duplicate.access_count += 1
duplicate.last_accessed = datetime.utcnow()
if importance > duplicate.importance:
duplicate.importance = importance
return duplicate.id
entry = MemoryEntry(
id=entry_id,
content=content,
embedding=embedding,
category=category,
importance=importance,
source=source,
metadata=metadata or {},
)
self.entries[entry_id] = entry
return entry_id
async def recall(
self,
query: str,
top_k: int = 5,
category: str | None = None,
min_importance: float = 0.0,
) -> list[MemoryEntry]:
"""Retrieve memories most relevant to a query."""
query_embedding = await self._embed(query)
candidates = list(self.entries.values())
if category:
candidates = [e for e in candidates if e.category == category]
if min_importance > 0:
candidates = [e for e in candidates if e.importance >= min_importance]
if not candidates:
return []
# Score by relevance * importance * recency
scored = []
for entry in candidates:
similarity = self._cosine_similarity(query_embedding, entry.embedding)
recency_boost = self._recency_score(entry.last_accessed)
score = (similarity * 0.6) + (entry.importance * 0.25) + (recency_boost * 0.15)
scored.append((score, entry))
scored.sort(key=lambda x: x[0], reverse=True)
# Update access tracking
results = []
for _, entry in scored[:top_k]:
entry.access_count += 1
entry.last_accessed = datetime.utcnow()
results.append(entry)
return results
async def forget(self, min_access: int = 0, older_than_days: int = 30) -> int:
"""Remove low-value memories (garbage collection)."""
cutoff = datetime.utcnow()
removed = 0
to_remove = []
for entry_id, entry in self.entries.items():
age_days = (cutoff - entry.created_at).days
if (
entry.access_count <= min_access
and age_days > older_than_days
and entry.importance < 0.3
):
to_remove.append(entry_id)
for entry_id in to_remove:
del self.entries[entry_id]
removed += 1
return removed
def _find_duplicate(
self, embedding: list[float], threshold: float
) -> MemoryEntry | None:
for entry in self.entries.values():
if self._cosine_similarity(embedding, entry.embedding) > threshold:
return entry
return None
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
a_arr, b_arr = np.array(a), np.array(b)
return float(np.dot(a_arr, b_arr) / (np.linalg.norm(a_arr) * np.linalg.norm(b_arr) + 1e-8))
def _recency_score(self, last_accessed: datetime) -> float:
hours_ago = (datetime.utcnow() - last_accessed).total_seconds() / 3600
return max(0.0, 1.0 - (hours_ago / 720)) # Decay over 30 days
async def _embed(self, text: str) -> list[float]:
if text in self._embedding_cache:
return self._embedding_cache[text]
import openai
client = openai.AsyncOpenAI()
response = await client.embeddings.create(
model=self.embedding_model, input=text
)
embedding = response.data[0].embedding
self._embedding_cache[text] = embedding
return embedding
Beyond basic RAG: This isn't "embed chunks and retrieve." It's a living memory system with importance scoring, duplicate detection, and garbage collection.
3. Episodic Memory — Remembering What Happened
Humans don't remember facts in isolation. We remember episodes — sequences of events with context.
# memory/episodic.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import json
@dataclass
class Episode:
id: str
title: str
started_at: datetime
ended_at: datetime | None = None
events: list[dict] = field(default_factory=list)
outcome: str = "" # "success", "failure", "partial", "abandoned"
lessons: list[str] = field(default_factory=list)
participants: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
def add_event(self, event_type: str, description: str, data: dict | None = None):
self.events.append({
"type": event_type,
"description": description,
"timestamp": datetime.utcnow().isoformat(),
"data": data or {},
})
def close(self, outcome: str, lessons: list[str] | None = None):
self.ended_at = datetime.utcnow()
self.outcome = outcome
if lessons:
self.lessons.extend(lessons)
@property
def duration_minutes(self) -> float | None:
if self.ended_at:
return (self.ended_at - self.started_at).total_seconds() / 60
return None
def to_narrative(self) -> str:
"""Convert episode to natural language for context injection."""
lines = [f"## Episode: {self.title}"]
lines.append(f"When: {self.started_at.strftime('%Y-%m-%d %H:%M')}")
if self.outcome:
lines.append(f"Outcome: {self.outcome}")
lines.append("\nWhat happened:")
for i, event in enumerate(self.events, 1):
lines.append(f"{i}. [{event['type']}] {event['description']}")
if self.lessons:
lines.append("\nLessons learned:")
for lesson in self.lessons:
lines.append(f"- {lesson}")
return "\n".join(lines)
class EpisodicMemory:
"""Manages episodic memories for an AI agent."""
def __init__(self, max_episodes: int = 100):
self.max_episodes = max_episodes
self.episodes: list[Episode] = []
self.current_episode: Episode | None = None
def start_episode(self, title: str, tags: list[str] | None = None) -> Episode:
if self.current_episode:
self.current_episode.close("abandoned")
self.episodes.append(self.current_episode)
episode = Episode(
id=f"ep_{len(self.episodes)}_{int(datetime.utcnow().timestamp())}",
title=title,
started_at=datetime.utcnow(),
tags=tags or [],
)
self.current_episode = episode
return episode
def record_event(self, event_type: str, description: str, data: dict | None = None):
if self.current_episode:
self.current_episode.add_event(event_type, description, data)
def end_episode(self, outcome: str, lessons: list[str] | None = None) -> Episode:
if not self.current_episode:
raise ValueError("No active episode")
self.current_episode.close(outcome, lessons)
self.episodes.append(self.current_episode)
episode = self.current_episode
self.current_episode = None
# Evict oldest if over limit
if len(self.episodes) > self.max_episodes:
self.episodes = self.episodes[-self.max_episodes:]
return episode
def recall_similar(self, situation: str, top_k: int = 3) -> list[Episode]:
"""Find past episodes relevant to a current situation.
In production, use embeddings. This shows keyword matching as fallback.
"""
situation_words = set(situation.lower().split())
scored = []
for episode in self.episodes:
episode_text = (
episode.title + " " +
" ".join(e["description"] for e in episode.events) +
" ".join(episode.tags)
).lower()
episode_words = set(episode_text.split())
overlap = len(situation_words & episode_words)
# Boost completed episodes with lessons
bonus = 0.5 if episode.lessons else 0
bonus += 0.3 if episode.outcome == "success" else 0
scored.append((overlap + bonus, episode))
scored.sort(key=lambda x: x[0], reverse=True)
return [ep for _, ep in scored[:top_k] if _ > 0]
def get_lessons_for(self, tags: list[str]) -> list[str]:
"""Extract all lessons from episodes matching given tags."""
lessons = []
tag_set = set(tags)
for episode in self.episodes:
if tag_set & set(episode.tags):
lessons.extend(episode.lessons)
return lessons
def get_context_block(self, situation: str, max_episodes: int = 2) -> str:
"""Generate a context block for injection into agent prompts."""
relevant = self.recall_similar(situation, top_k=max_episodes)
if not relevant:
return ""
lines = ["# Relevant Past Experiences\n"]
for episode in relevant:
lines.append(episode.to_narrative())
lines.append("")
return "\n".join(lines)
# Usage in an agent
episodic = EpisodicMemory()
# Agent starts a debugging task
episodic.start_episode("Debug payment webhook failure", tags=["debugging", "payments", "webhook"])
episodic.record_event("investigation", "Checked webhook logs — 403 errors from Stripe")
episodic.record_event("hypothesis", "API key might have expired")
episodic.record_event("action", "Rotated API key in production")
episodic.record_event("verification", "Webhook now returning 200")
episodic.end_episode(
outcome="success",
lessons=[
"Stripe API keys expire after 1 year if not rotated",
"Check key expiry before investigating code changes",
"Add key expiry monitoring to alerting stack",
],
)
# Later, when a similar issue comes up:
context = episodic.get_context_block("webhook returning errors")
# → Returns the debugging episode with lessons learned
Why episodic memory matters: When your agent encounters a problem it solved before, it shouldn't start from scratch. Episodic memory gives it experience.
4. Working Memory — The Agent's Scratchpad
Short-term structured storage for the current task. Think of it as the agent's working notes.
# memory/working.py
from dataclasses import dataclass, field
from typing import Any
from datetime import datetime
@dataclass
class WorkingMemory:
"""Structured scratchpad for agent's current task."""
goal: str = ""
plan: list[str] = field(default_factory=list)
current_step: int = 0
findings: dict[str, Any] = field(default_factory=dict)
hypotheses: list[dict] = field(default_factory=list)
blockers: list[str] = field(default_factory=list)
decisions: list[dict] = field(default_factory=list)
scratch: str = "" # Free-form notes
def set_goal(self, goal: str) -> None:
self.goal = goal
self.current_step = 0
self.plan = []
def set_plan(self, steps: list[str]) -> None:
self.plan = steps
self.current_step = 0
def advance(self) -> str | None:
if self.current_step < len(self.plan):
step = self.plan[self.current_step]
self.current_step += 1
return step
return None
def record_finding(self, key: str, value: Any) -> None:
self.findings[key] = {
"value": value,
"recorded_at": datetime.utcnow().isoformat(),
}
def add_hypothesis(self, hypothesis: str, confidence: float = 0.5) -> None:
self.hypotheses.append({
"text": hypothesis,
"confidence": confidence,
"status": "untested",
"added_at": datetime.utcnow().isoformat(),
})
def update_hypothesis(self, index: int, status: str, confidence: float | None = None):
if 0 <= index < len(self.hypotheses):
self.hypotheses[index]["status"] = status
if confidence is not None:
self.hypotheses[index]["confidence"] = confidence
def record_decision(self, decision: str, reasoning: str) -> None:
self.decisions.append({
"decision": decision,
"reasoning": reasoning,
"timestamp": datetime.utcnow().isoformat(),
})
def add_blocker(self, blocker: str) -> None:
self.blockers.append(blocker)
def to_context(self) -> str:
"""Serialize to a context block for prompt injection."""
lines = ["# Working Memory\n"]
if self.goal:
lines.append(f"**Goal:** {self.goal}\n")
if self.plan:
lines.append("**Plan:**")
for i, step in enumerate(self.plan):
marker = "→" if i == self.current_step else ("✓" if i < self.current_step else " ")
lines.append(f" {marker} {i + 1}. {step}")
lines.append("")
if self.findings:
lines.append("**Findings:**")
for key, val in self.findings.items():
lines.append(f" - {key}: {val['value']}")
lines.append("")
if self.hypotheses:
lines.append("**Hypotheses:**")
for h in self.hypotheses:
lines.append(f" - [{h['status']}] {h['text']} (confidence: {h['confidence']})")
lines.append("")
if self.blockers:
lines.append("**Blockers:**")
for b in self.blockers:
lines.append(f" - ⚠️ {b}")
lines.append("")
if self.decisions:
lines.append("**Decisions Made:**")
for d in self.decisions:
lines.append(f" - {d['decision']} (reason: {d['reasoning']})")
return "\n".join(lines)
The insight: LLMs perform better when they can see structured intermediate state. Working memory gives the agent a "whiteboard" to think on.
5. Persistent Memory Store with SQLite
All the above patterns need persistence. Here's a battle-tested SQLite backend.
# memory/persistent.py
import sqlite3
import json
from datetime import datetime
from pathlib import Path
from contextlib import contextmanager
class PersistentMemoryStore:
"""SQLite-backed persistent storage for all memory types."""
def __init__(self, db_path: str = "agent_memory.db"):
self.db_path = db_path
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self._init_db()
def _init_db(self) -> None:
with self._conn() as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
category TEXT NOT NULL,
importance REAL DEFAULT 0.5,
embedding BLOB,
created_at TEXT NOT NULL,
last_accessed TEXT NOT NULL,
access_count INTEGER DEFAULT 0,
source TEXT DEFAULT '',
metadata TEXT DEFAULT '{}'
);
CREATE TABLE IF NOT EXISTS episodes (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
started_at TEXT NOT NULL,
ended_at TEXT,
outcome TEXT DEFAULT '',
events TEXT DEFAULT '[]',
lessons TEXT DEFAULT '[]',
tags TEXT DEFAULT '[]'
);
CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp TEXT NOT NULL,
token_count INTEGER DEFAULT 0,
metadata TEXT DEFAULT '{}'
);
CREATE TABLE IF NOT EXISTS kv_store (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_memories_category ON memories(category);
CREATE INDEX IF NOT EXISTS idx_memories_importance ON memories(importance);
CREATE INDEX IF NOT EXISTS idx_conversations_session ON conversations(session_id);
CREATE INDEX IF NOT EXISTS idx_episodes_tags ON episodes(tags);
""")
@contextmanager
def _conn(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
# --- Memory CRUD ---
def save_memory(self, memory_id: str, content: str, category: str,
importance: float, embedding: list[float] | None = None,
source: str = "", metadata: dict | None = None) -> None:
with self._conn() as conn:
conn.execute("""
INSERT OR REPLACE INTO memories
(id, content, category, importance, embedding, created_at, last_accessed, source, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
memory_id, content, category, importance,
json.dumps(embedding) if embedding else None,
datetime.utcnow().isoformat(),
datetime.utcnow().isoformat(),
source,
json.dumps(metadata or {}),
))
def get_memories(self, category: str | None = None,
min_importance: float = 0.0,
limit: int = 50) -> list[dict]:
query = "SELECT * FROM memories WHERE importance >= ?"
params: list = [min_importance]
if category:
query += " AND category = ?"
params.append(category)
query += " ORDER BY importance DESC, last_accessed DESC LIMIT ?"
params.append(limit)
with self._conn() as conn:
rows = conn.execute(query, params).fetchall()
return [dict(row) for row in rows]
def touch_memory(self, memory_id: str) -> None:
with self._conn() as conn:
conn.execute("""
UPDATE memories
SET last_accessed = ?, access_count = access_count + 1
WHERE id = ?
""", (datetime.utcnow().isoformat(), memory_id))
# --- Episode CRUD ---
def save_episode(self, episode_id: str, title: str, started_at: str,
ended_at: str | None, outcome: str,
events: list, lessons: list, tags: list) -> None:
with self._conn() as conn:
conn.execute("""
INSERT OR REPLACE INTO episodes
(id, title, started_at, ended_at, outcome, events, lessons, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
episode_id, title, started_at, ended_at, outcome,
json.dumps(events), json.dumps(lessons), json.dumps(tags),
))
def get_episodes(self, tag: str | None = None, limit: int = 20) -> list[dict]:
with self._conn() as conn:
if tag:
rows = conn.execute(
"SELECT * FROM episodes WHERE tags LIKE ? ORDER BY started_at DESC LIMIT ?",
(f'%"{tag}"%', limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM episodes ORDER BY started_at DESC LIMIT ?",
(limit,),
).fetchall()
results = []
for row in rows:
d = dict(row)
d["events"] = json.loads(d["events"])
d["lessons"] = json.loads(d["lessons"])
d["tags"] = json.loads(d["tags"])
results.append(d)
return results
# --- KV Store ---
def kv_set(self, key: str, value: Any) -> None:
with self._conn() as conn:
conn.execute(
"INSERT OR REPLACE INTO kv_store (key, value, updated_at) VALUES (?, ?, ?)",
(key, json.dumps(value), datetime.utcnow().isoformat()),
)
def kv_get(self, key: str, default: Any = None) -> Any:
with self._conn() as conn:
row = conn.execute("SELECT value FROM kv_store WHERE key = ?", (key,)).fetchone()
return json.loads(row["value"]) if row else default
# --- Maintenance ---
def gc(self, max_age_days: int = 90, min_importance: float = 0.2) -> int:
"""Garbage collect old, low-importance memories."""
cutoff = datetime.utcnow()
with self._conn() as conn:
result = conn.execute("""
DELETE FROM memories
WHERE importance < ?
AND access_count < 2
AND julianday(?) - julianday(created_at) > ?
""", (min_importance, cutoff.isoformat(), max_age_days))
return result.rowcount
Why SQLite? It's embedded, zero-config, handles concurrent reads, and survives restarts. For single-agent systems, it's all you need.
6. Memory Consolidation (Sleep-Like Processing)
Humans consolidate memories during sleep. Your agent should do the same between sessions.
# memory/consolidation.py
import asyncio
from datetime import datetime, timedelta
class MemoryConsolidator:
"""Runs between sessions to organize and strengthen memories."""
def __init__(self, semantic_memory, episodic_memory, store):
self.semantic = semantic_memory
self.episodic = episodic_memory
self.store = store
async def consolidate(self) -> dict:
"""Run full consolidation cycle."""
report = {
"started_at": datetime.utcnow().isoformat(),
"merged_duplicates": 0,
"extracted_lessons": 0,
"decayed_memories": 0,
"promoted_memories": 0,
}
# 1. Merge near-duplicate memories
report["merged_duplicates"] = await self._merge_duplicates()
# 2. Extract patterns from recent episodes
report["extracted_lessons"] = await self._extract_patterns()
# 3. Decay unused memories
report["decayed_memories"] = await self._decay_unused()
# 4. Promote frequently accessed memories
report["promoted_memories"] = await self._promote_frequent()
report["completed_at"] = datetime.utcnow().isoformat()
self.store.kv_set("last_consolidation", report)
return report
async def _merge_duplicates(self) -> int:
"""Find and merge semantically similar memories."""
entries = list(self.semantic.entries.values())
merged = 0
seen_ids = set()
for i, entry_a in enumerate(entries):
if entry_a.id in seen_ids:
continue
for entry_b in entries[i + 1:]:
if entry_b.id in seen_ids:
continue
similarity = self.semantic._cosine_similarity(
entry_a.embedding, entry_b.embedding
)
if similarity > 0.92:
# Keep the one with higher importance
if entry_a.importance >= entry_b.importance:
entry_a.access_count += entry_b.access_count
seen_ids.add(entry_b.id)
else:
entry_b.access_count += entry_a.access_count
seen_ids.add(entry_a.id)
merged += 1
for entry_id in seen_ids:
self.semantic.entries.pop(entry_id, None)
return merged
async def _extract_patterns(self) -> int:
"""Find recurring lessons across episodes."""
recent = [
ep for ep in self.episodic.episodes
if ep.ended_at and
(datetime.utcnow() - ep.ended_at).days < 7
]
all_lessons = []
for ep in recent:
all_lessons.extend(ep.lessons)
# Group similar lessons
extracted = 0
for lesson in all_lessons:
await self.semantic.store(
content=f"Learned lesson: {lesson}",
category="lesson",
importance=0.7,
source="consolidation",
)
extracted += 1
return extracted
async def _decay_unused(self) -> int:
"""Reduce importance of memories that are never accessed."""
decayed = 0
cutoff = datetime.utcnow() - timedelta(days=14)
for entry in self.semantic.entries.values():
if entry.last_accessed < cutoff and entry.access_count < 2:
entry.importance *= 0.8 # 20% decay
decayed += 1
return decayed
async def _promote_frequent(self) -> int:
"""Increase importance of frequently accessed memories."""
promoted = 0
for entry in self.semantic.entries.values():
if entry.access_count >= 5 and entry.importance < 0.9:
entry.importance = min(1.0, entry.importance * 1.2)
promoted += 1
return promoted
The trick nobody mentions: Memory isn't just storage — it's an active process. Consolidation keeps your agent's memory sharp and relevant.
7. Context-Aware Memory Retrieval
Don't just search memories by query. Factor in the current task, emotional state, and conversation trajectory.
# memory/contextual_retrieval.py
from dataclasses import dataclass
@dataclass
class RetrievalContext:
current_query: str
task_type: str # "debugging", "creating", "analyzing", "chatting"
conversation_topics: list[str]
urgency: float = 0.5 # 0 = casual, 1 = critical
user_sentiment: str = "neutral"
class ContextualRetriever:
"""Retrieves memories considering full conversation context."""
def __init__(self, semantic_memory, episodic_memory):
self.semantic = semantic_memory
self.episodic = episodic_memory
async def retrieve(
self, context: RetrievalContext, max_tokens: int = 2000
) -> str:
"""Build a memory context block tailored to the current situation."""
blocks = []
remaining_tokens = max_tokens
# 1. Relevant facts (always included)
facts = await self.semantic.recall(
context.current_query,
top_k=5,
min_importance=0.3 if context.urgency < 0.5 else 0.0,
)
if facts:
fact_block = "## Relevant Knowledge\n"
for f in facts:
line = f"- {f.content}\n"
tokens = len(line) // 4
if tokens < remaining_tokens:
fact_block += line
remaining_tokens -= tokens
blocks.append(fact_block)
# 2. Past experiences (for debugging/problem-solving tasks)
if context.task_type in ("debugging", "analyzing"):
episodes = self.episodic.recall_similar(
context.current_query, top_k=2
)
for ep in episodes:
narrative = ep.to_narrative()
tokens = len(narrative) // 4
if tokens < remaining_tokens:
blocks.append(narrative)
remaining_tokens -= tokens
# 3. User preferences (for creation/chat tasks)
if context.task_type in ("creating", "chatting"):
prefs = await self.semantic.recall(
"user preferences and style",
top_k=3,
category="preference",
)
if prefs:
pref_block = "## User Preferences\n"
for p in prefs:
pref_block += f"- {p.content}\n"
blocks.append(pref_block)
# 4. Learned lessons (when urgency is high)
if context.urgency > 0.7:
topic_lessons = self.episodic.get_lessons_for(
context.conversation_topics
)
if topic_lessons:
lesson_block = "## ⚠️ Lessons from Past Experience\n"
for lesson in topic_lessons[:5]:
lesson_block += f"- {lesson}\n"
blocks.append(lesson_block)
return "\n\n".join(blocks) if blocks else ""
8. The Unified Memory Manager
Tie everything together with a single interface.
# memory/manager.py
class UnifiedMemoryManager:
"""Single interface for all memory subsystems."""
def __init__(self, db_path: str = "agent_memory.db"):
self.store = PersistentMemoryStore(db_path)
self.sliding = SlidingWindowMemory()
self.semantic = SemanticMemory()
self.episodic = EpisodicMemory()
self.working = WorkingMemory()
self.consolidator = MemoryConsolidator(
self.semantic, self.episodic, self.store
)
self.retriever = ContextualRetriever(self.semantic, self.episodic)
async def process_message(self, role: str, content: str) -> None:
"""Process an incoming message across all memory systems."""
msg = Message(role=role, content=content)
self.sliding.add(msg)
# Auto-extract facts from assistant responses
if role == "assistant":
facts = self._extract_facts(content)
for fact in facts:
await self.semantic.store(fact, category="fact", importance=0.4)
async def get_full_context(self, query: str, task_type: str = "chatting") -> dict:
"""Build complete context from all memory systems."""
retrieval_ctx = RetrievalContext(
current_query=query,
task_type=task_type,
conversation_topics=self._extract_topics(query),
)
return {
"conversation": self.sliding.get_context(),
"memories": await self.retriever.retrieve(retrieval_ctx),
"working_memory": self.working.to_context(),
}
async def end_session(self) -> None:
"""Persist and consolidate at session end."""
await self.consolidator.consolidate()
def _extract_facts(self, text: str) -> list[str]:
"""Simple fact extraction from text."""
sentences = text.split(". ")
facts = []
fact_indicators = ["is", "are", "was", "means", "requires", "should"]
for s in sentences:
if any(ind in s.lower() for ind in fact_indicators) and len(s) > 20:
facts.append(s.strip())
return facts[:3] # Max 3 facts per message
def _extract_topics(self, text: str) -> list[str]:
words = text.lower().split()
# Simple keyword extraction (use NLP in production)
return [w for w in words if len(w) > 5][:5]
The Memory Architecture Stack
| Layer | Pattern | What It Stores |
|---|---|---|
| Immediate | Sliding Window | Last N messages + summary |
| Short-term | Working Memory | Current task state |
| Long-term | Semantic Memory | Facts, preferences, knowledge |
| Experiential | Episodic Memory | What happened and lessons |
| Persistent | SQLite Store | Everything, across restarts |
| Maintenance | Consolidation | Garbage collection + strengthening |
| Retrieval | Contextual | Smart, situation-aware recall |
| Unified | Manager | Single API for everything |
The key insight: memory isn't one thing. Humans have multiple memory systems working together. Your agents should too.
Getting Started
Building a complete memory system is non-trivial. If you're looking for production-ready building blocks — token management, multi-model routing, and agent infrastructure patterns — the AI Dev Toolkit has components that integrate with exactly these kinds of architectures.
Start with Pattern 1 (sliding window). Add semantic memory when your agent needs to remember across sessions. Add episodic memory when it needs to learn from experience.
What memory patterns are you using in your agents? I'd love to hear what's working in production.
This is part of the "AI Engineering in Practice" series — building real AI systems, not demos.
Top comments (0)