DEV Community

dohko
dohko

Posted on

8 AI Agent Memory Patterns for Production Systems (Beyond Basic RAG)

8 AI Agent Memory Patterns for Production Systems (Beyond Basic RAG)

Every AI agent tutorial shows stateless request-response. User asks, agent answers, context vanishes.

Real agents need memory. Not just "stuff the last 10 messages into the prompt" — actual structured memory that persists, compresses, and retrieves intelligently.

Here are 8 memory patterns we use in production, ranked from simplest to most sophisticated.


1. Sliding Window with Smart Summarization

The baseline. Keep recent messages, summarize old ones. But do it properly.

# memory/sliding_window.py
from dataclasses import dataclass, field
from datetime import datetime
import json

@dataclass
class Message:
    role: str  # "user", "assistant", "system", "tool"
    content: str
    timestamp: datetime = field(default_factory=datetime.utcnow)
    token_count: int = 0
    metadata: dict = field(default_factory=dict)

class SlidingWindowMemory:
    """Maintains a context window with automatic summarization."""

    def __init__(
        self,
        max_tokens: int = 8000,
        summarize_threshold: float = 0.8,
        summary_model: str = "claude-3-5-haiku-20241022",
    ):
        self.max_tokens = max_tokens
        self.summarize_threshold = summarize_threshold
        self.summary_model = summary_model
        self.messages: list[Message] = []
        self.summary: str = ""
        self.total_tokens: int = 0
        self._summaries_created: int = 0

    def add(self, message: Message) -> None:
        message.token_count = self._estimate_tokens(message.content)
        self.messages.append(message)
        self.total_tokens += message.token_count

        if self.total_tokens > self.max_tokens * self.summarize_threshold:
            self._compress()

    def get_context(self) -> list[dict]:
        """Return messages formatted for LLM consumption."""
        context = []
        if self.summary:
            context.append({
                "role": "system",
                "content": f"Conversation summary so far:\n{self.summary}",
            })
        for msg in self.messages:
            context.append({"role": msg.role, "content": msg.content})
        return context

    def _compress(self) -> None:
        """Summarize the oldest half of messages."""
        if len(self.messages) < 4:
            return

        split_point = len(self.messages) // 2
        old_messages = self.messages[:split_point]
        self.messages = self.messages[split_point:]

        # Build summary prompt
        old_text = "\n".join(
            f"{m.role}: {m.content[:500]}" for m in old_messages
        )

        new_summary = self._call_summarizer(
            f"Previous summary: {self.summary}\n\n"
            f"New messages to incorporate:\n{old_text}\n\n"
            f"Create a concise summary preserving: key decisions, "
            f"user preferences, task progress, and important facts."
        )

        self.summary = new_summary
        self.total_tokens = sum(m.token_count for m in self.messages)
        self.total_tokens += self._estimate_tokens(self.summary)
        self._summaries_created += 1

    def _call_summarizer(self, prompt: str) -> str:
        """Call a fast model for summarization."""
        import anthropic
        client = anthropic.Anthropic()
        response = client.messages.create(
            model=self.summary_model,
            max_tokens=500,
            messages=[{"role": "user", "content": prompt}],
        )
        return response.content[0].text

    def _estimate_tokens(self, text: str) -> int:
        return len(text) // 4  # Rough estimate

    def stats(self) -> dict:
        return {
            "messages": len(self.messages),
            "total_tokens": self.total_tokens,
            "has_summary": bool(self.summary),
            "compressions": self._summaries_created,
        }
Enter fullscreen mode Exit fullscreen mode

When to use: Every agent needs this as a baseline. The key insight: use a fast, cheap model (Haiku) for summarization, not your main model.


2. Semantic Memory with Vector Search

Store facts and retrieve them by meaning, not just recency.

# memory/semantic.py
import numpy as np
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import json
import hashlib

@dataclass
class MemoryEntry:
    id: str
    content: str
    embedding: list[float]
    category: str  # "fact", "preference", "decision", "event"
    importance: float = 0.5  # 0-1 scale
    created_at: datetime = field(default_factory=datetime.utcnow)
    last_accessed: datetime = field(default_factory=datetime.utcnow)
    access_count: int = 0
    source: str = ""  # Which conversation created this
    metadata: dict = field(default_factory=dict)

class SemanticMemory:
    """Vector-based long-term memory for AI agents."""

    def __init__(self, embedding_model: str = "text-embedding-3-small"):
        self.embedding_model = embedding_model
        self.entries: dict[str, MemoryEntry] = {}
        self._embedding_cache: dict[str, list[float]] = {}

    async def store(
        self,
        content: str,
        category: str = "fact",
        importance: float = 0.5,
        source: str = "",
        metadata: dict | None = None,
    ) -> str:
        """Store a memory with its embedding."""
        entry_id = hashlib.sha256(content.encode()).hexdigest()[:16]

        # Check for near-duplicates
        embedding = await self._embed(content)
        duplicate = self._find_duplicate(embedding, threshold=0.95)
        if duplicate:
            # Update existing instead of creating duplicate
            duplicate.access_count += 1
            duplicate.last_accessed = datetime.utcnow()
            if importance > duplicate.importance:
                duplicate.importance = importance
            return duplicate.id

        entry = MemoryEntry(
            id=entry_id,
            content=content,
            embedding=embedding,
            category=category,
            importance=importance,
            source=source,
            metadata=metadata or {},
        )
        self.entries[entry_id] = entry
        return entry_id

    async def recall(
        self,
        query: str,
        top_k: int = 5,
        category: str | None = None,
        min_importance: float = 0.0,
    ) -> list[MemoryEntry]:
        """Retrieve memories most relevant to a query."""
        query_embedding = await self._embed(query)

        candidates = list(self.entries.values())
        if category:
            candidates = [e for e in candidates if e.category == category]
        if min_importance > 0:
            candidates = [e for e in candidates if e.importance >= min_importance]

        if not candidates:
            return []

        # Score by relevance * importance * recency
        scored = []
        for entry in candidates:
            similarity = self._cosine_similarity(query_embedding, entry.embedding)
            recency_boost = self._recency_score(entry.last_accessed)
            score = (similarity * 0.6) + (entry.importance * 0.25) + (recency_boost * 0.15)
            scored.append((score, entry))

        scored.sort(key=lambda x: x[0], reverse=True)

        # Update access tracking
        results = []
        for _, entry in scored[:top_k]:
            entry.access_count += 1
            entry.last_accessed = datetime.utcnow()
            results.append(entry)

        return results

    async def forget(self, min_access: int = 0, older_than_days: int = 30) -> int:
        """Remove low-value memories (garbage collection)."""
        cutoff = datetime.utcnow()
        removed = 0
        to_remove = []

        for entry_id, entry in self.entries.items():
            age_days = (cutoff - entry.created_at).days
            if (
                entry.access_count <= min_access
                and age_days > older_than_days
                and entry.importance < 0.3
            ):
                to_remove.append(entry_id)

        for entry_id in to_remove:
            del self.entries[entry_id]
            removed += 1

        return removed

    def _find_duplicate(
        self, embedding: list[float], threshold: float
    ) -> MemoryEntry | None:
        for entry in self.entries.values():
            if self._cosine_similarity(embedding, entry.embedding) > threshold:
                return entry
        return None

    def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
        a_arr, b_arr = np.array(a), np.array(b)
        return float(np.dot(a_arr, b_arr) / (np.linalg.norm(a_arr) * np.linalg.norm(b_arr) + 1e-8))

    def _recency_score(self, last_accessed: datetime) -> float:
        hours_ago = (datetime.utcnow() - last_accessed).total_seconds() / 3600
        return max(0.0, 1.0 - (hours_ago / 720))  # Decay over 30 days

    async def _embed(self, text: str) -> list[float]:
        if text in self._embedding_cache:
            return self._embedding_cache[text]
        import openai
        client = openai.AsyncOpenAI()
        response = await client.embeddings.create(
            model=self.embedding_model, input=text
        )
        embedding = response.data[0].embedding
        self._embedding_cache[text] = embedding
        return embedding
Enter fullscreen mode Exit fullscreen mode

Beyond basic RAG: This isn't "embed chunks and retrieve." It's a living memory system with importance scoring, duplicate detection, and garbage collection.


3. Episodic Memory — Remembering What Happened

Humans don't remember facts in isolation. We remember episodes — sequences of events with context.

# memory/episodic.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import json

@dataclass
class Episode:
    id: str
    title: str
    started_at: datetime
    ended_at: datetime | None = None
    events: list[dict] = field(default_factory=list)
    outcome: str = ""  # "success", "failure", "partial", "abandoned"
    lessons: list[str] = field(default_factory=list)
    participants: list[str] = field(default_factory=list)
    tags: list[str] = field(default_factory=list)

    def add_event(self, event_type: str, description: str, data: dict | None = None):
        self.events.append({
            "type": event_type,
            "description": description,
            "timestamp": datetime.utcnow().isoformat(),
            "data": data or {},
        })

    def close(self, outcome: str, lessons: list[str] | None = None):
        self.ended_at = datetime.utcnow()
        self.outcome = outcome
        if lessons:
            self.lessons.extend(lessons)

    @property
    def duration_minutes(self) -> float | None:
        if self.ended_at:
            return (self.ended_at - self.started_at).total_seconds() / 60
        return None

    def to_narrative(self) -> str:
        """Convert episode to natural language for context injection."""
        lines = [f"## Episode: {self.title}"]
        lines.append(f"When: {self.started_at.strftime('%Y-%m-%d %H:%M')}")
        if self.outcome:
            lines.append(f"Outcome: {self.outcome}")

        lines.append("\nWhat happened:")
        for i, event in enumerate(self.events, 1):
            lines.append(f"{i}. [{event['type']}] {event['description']}")

        if self.lessons:
            lines.append("\nLessons learned:")
            for lesson in self.lessons:
                lines.append(f"- {lesson}")

        return "\n".join(lines)


class EpisodicMemory:
    """Manages episodic memories for an AI agent."""

    def __init__(self, max_episodes: int = 100):
        self.max_episodes = max_episodes
        self.episodes: list[Episode] = []
        self.current_episode: Episode | None = None

    def start_episode(self, title: str, tags: list[str] | None = None) -> Episode:
        if self.current_episode:
            self.current_episode.close("abandoned")
            self.episodes.append(self.current_episode)

        episode = Episode(
            id=f"ep_{len(self.episodes)}_{int(datetime.utcnow().timestamp())}",
            title=title,
            started_at=datetime.utcnow(),
            tags=tags or [],
        )
        self.current_episode = episode
        return episode

    def record_event(self, event_type: str, description: str, data: dict | None = None):
        if self.current_episode:
            self.current_episode.add_event(event_type, description, data)

    def end_episode(self, outcome: str, lessons: list[str] | None = None) -> Episode:
        if not self.current_episode:
            raise ValueError("No active episode")

        self.current_episode.close(outcome, lessons)
        self.episodes.append(self.current_episode)
        episode = self.current_episode
        self.current_episode = None

        # Evict oldest if over limit
        if len(self.episodes) > self.max_episodes:
            self.episodes = self.episodes[-self.max_episodes:]

        return episode

    def recall_similar(self, situation: str, top_k: int = 3) -> list[Episode]:
        """Find past episodes relevant to a current situation.

        In production, use embeddings. This shows keyword matching as fallback.
        """
        situation_words = set(situation.lower().split())
        scored = []

        for episode in self.episodes:
            episode_text = (
                episode.title + " " +
                " ".join(e["description"] for e in episode.events) +
                " ".join(episode.tags)
            ).lower()
            episode_words = set(episode_text.split())

            overlap = len(situation_words & episode_words)
            # Boost completed episodes with lessons
            bonus = 0.5 if episode.lessons else 0
            bonus += 0.3 if episode.outcome == "success" else 0

            scored.append((overlap + bonus, episode))

        scored.sort(key=lambda x: x[0], reverse=True)
        return [ep for _, ep in scored[:top_k] if _ > 0]

    def get_lessons_for(self, tags: list[str]) -> list[str]:
        """Extract all lessons from episodes matching given tags."""
        lessons = []
        tag_set = set(tags)
        for episode in self.episodes:
            if tag_set & set(episode.tags):
                lessons.extend(episode.lessons)
        return lessons

    def get_context_block(self, situation: str, max_episodes: int = 2) -> str:
        """Generate a context block for injection into agent prompts."""
        relevant = self.recall_similar(situation, top_k=max_episodes)
        if not relevant:
            return ""

        lines = ["# Relevant Past Experiences\n"]
        for episode in relevant:
            lines.append(episode.to_narrative())
            lines.append("")

        return "\n".join(lines)


# Usage in an agent
episodic = EpisodicMemory()

# Agent starts a debugging task
episodic.start_episode("Debug payment webhook failure", tags=["debugging", "payments", "webhook"])
episodic.record_event("investigation", "Checked webhook logs — 403 errors from Stripe")
episodic.record_event("hypothesis", "API key might have expired")
episodic.record_event("action", "Rotated API key in production")
episodic.record_event("verification", "Webhook now returning 200")
episodic.end_episode(
    outcome="success",
    lessons=[
        "Stripe API keys expire after 1 year if not rotated",
        "Check key expiry before investigating code changes",
        "Add key expiry monitoring to alerting stack",
    ],
)

# Later, when a similar issue comes up:
context = episodic.get_context_block("webhook returning errors")
# → Returns the debugging episode with lessons learned
Enter fullscreen mode Exit fullscreen mode

Why episodic memory matters: When your agent encounters a problem it solved before, it shouldn't start from scratch. Episodic memory gives it experience.


4. Working Memory — The Agent's Scratchpad

Short-term structured storage for the current task. Think of it as the agent's working notes.

# memory/working.py
from dataclasses import dataclass, field
from typing import Any
from datetime import datetime

@dataclass
class WorkingMemory:
    """Structured scratchpad for agent's current task."""

    goal: str = ""
    plan: list[str] = field(default_factory=list)
    current_step: int = 0
    findings: dict[str, Any] = field(default_factory=dict)
    hypotheses: list[dict] = field(default_factory=list)
    blockers: list[str] = field(default_factory=list)
    decisions: list[dict] = field(default_factory=list)
    scratch: str = ""  # Free-form notes

    def set_goal(self, goal: str) -> None:
        self.goal = goal
        self.current_step = 0
        self.plan = []

    def set_plan(self, steps: list[str]) -> None:
        self.plan = steps
        self.current_step = 0

    def advance(self) -> str | None:
        if self.current_step < len(self.plan):
            step = self.plan[self.current_step]
            self.current_step += 1
            return step
        return None

    def record_finding(self, key: str, value: Any) -> None:
        self.findings[key] = {
            "value": value,
            "recorded_at": datetime.utcnow().isoformat(),
        }

    def add_hypothesis(self, hypothesis: str, confidence: float = 0.5) -> None:
        self.hypotheses.append({
            "text": hypothesis,
            "confidence": confidence,
            "status": "untested",
            "added_at": datetime.utcnow().isoformat(),
        })

    def update_hypothesis(self, index: int, status: str, confidence: float | None = None):
        if 0 <= index < len(self.hypotheses):
            self.hypotheses[index]["status"] = status
            if confidence is not None:
                self.hypotheses[index]["confidence"] = confidence

    def record_decision(self, decision: str, reasoning: str) -> None:
        self.decisions.append({
            "decision": decision,
            "reasoning": reasoning,
            "timestamp": datetime.utcnow().isoformat(),
        })

    def add_blocker(self, blocker: str) -> None:
        self.blockers.append(blocker)

    def to_context(self) -> str:
        """Serialize to a context block for prompt injection."""
        lines = ["# Working Memory\n"]

        if self.goal:
            lines.append(f"**Goal:** {self.goal}\n")

        if self.plan:
            lines.append("**Plan:**")
            for i, step in enumerate(self.plan):
                marker = "" if i == self.current_step else ("" if i < self.current_step else " ")
                lines.append(f"  {marker} {i + 1}. {step}")
            lines.append("")

        if self.findings:
            lines.append("**Findings:**")
            for key, val in self.findings.items():
                lines.append(f"  - {key}: {val['value']}")
            lines.append("")

        if self.hypotheses:
            lines.append("**Hypotheses:**")
            for h in self.hypotheses:
                lines.append(f"  - [{h['status']}] {h['text']} (confidence: {h['confidence']})")
            lines.append("")

        if self.blockers:
            lines.append("**Blockers:**")
            for b in self.blockers:
                lines.append(f"  - ⚠️ {b}")
            lines.append("")

        if self.decisions:
            lines.append("**Decisions Made:**")
            for d in self.decisions:
                lines.append(f"  - {d['decision']} (reason: {d['reasoning']})")

        return "\n".join(lines)
Enter fullscreen mode Exit fullscreen mode

The insight: LLMs perform better when they can see structured intermediate state. Working memory gives the agent a "whiteboard" to think on.


5. Persistent Memory Store with SQLite

All the above patterns need persistence. Here's a battle-tested SQLite backend.

# memory/persistent.py
import sqlite3
import json
from datetime import datetime
from pathlib import Path
from contextlib import contextmanager

class PersistentMemoryStore:
    """SQLite-backed persistent storage for all memory types."""

    def __init__(self, db_path: str = "agent_memory.db"):
        self.db_path = db_path
        Path(db_path).parent.mkdir(parents=True, exist_ok=True)
        self._init_db()

    def _init_db(self) -> None:
        with self._conn() as conn:
            conn.executescript("""
                CREATE TABLE IF NOT EXISTS memories (
                    id TEXT PRIMARY KEY,
                    content TEXT NOT NULL,
                    category TEXT NOT NULL,
                    importance REAL DEFAULT 0.5,
                    embedding BLOB,
                    created_at TEXT NOT NULL,
                    last_accessed TEXT NOT NULL,
                    access_count INTEGER DEFAULT 0,
                    source TEXT DEFAULT '',
                    metadata TEXT DEFAULT '{}'
                );

                CREATE TABLE IF NOT EXISTS episodes (
                    id TEXT PRIMARY KEY,
                    title TEXT NOT NULL,
                    started_at TEXT NOT NULL,
                    ended_at TEXT,
                    outcome TEXT DEFAULT '',
                    events TEXT DEFAULT '[]',
                    lessons TEXT DEFAULT '[]',
                    tags TEXT DEFAULT '[]'
                );

                CREATE TABLE IF NOT EXISTS conversations (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    session_id TEXT NOT NULL,
                    role TEXT NOT NULL,
                    content TEXT NOT NULL,
                    timestamp TEXT NOT NULL,
                    token_count INTEGER DEFAULT 0,
                    metadata TEXT DEFAULT '{}'
                );

                CREATE TABLE IF NOT EXISTS kv_store (
                    key TEXT PRIMARY KEY,
                    value TEXT NOT NULL,
                    updated_at TEXT NOT NULL
                );

                CREATE INDEX IF NOT EXISTS idx_memories_category ON memories(category);
                CREATE INDEX IF NOT EXISTS idx_memories_importance ON memories(importance);
                CREATE INDEX IF NOT EXISTS idx_conversations_session ON conversations(session_id);
                CREATE INDEX IF NOT EXISTS idx_episodes_tags ON episodes(tags);
            """)

    @contextmanager
    def _conn(self):
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        try:
            yield conn
            conn.commit()
        finally:
            conn.close()

    # --- Memory CRUD ---

    def save_memory(self, memory_id: str, content: str, category: str,
                    importance: float, embedding: list[float] | None = None,
                    source: str = "", metadata: dict | None = None) -> None:
        with self._conn() as conn:
            conn.execute("""
                INSERT OR REPLACE INTO memories
                (id, content, category, importance, embedding, created_at, last_accessed, source, metadata)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                memory_id, content, category, importance,
                json.dumps(embedding) if embedding else None,
                datetime.utcnow().isoformat(),
                datetime.utcnow().isoformat(),
                source,
                json.dumps(metadata or {}),
            ))

    def get_memories(self, category: str | None = None,
                     min_importance: float = 0.0,
                     limit: int = 50) -> list[dict]:
        query = "SELECT * FROM memories WHERE importance >= ?"
        params: list = [min_importance]

        if category:
            query += " AND category = ?"
            params.append(category)

        query += " ORDER BY importance DESC, last_accessed DESC LIMIT ?"
        params.append(limit)

        with self._conn() as conn:
            rows = conn.execute(query, params).fetchall()
            return [dict(row) for row in rows]

    def touch_memory(self, memory_id: str) -> None:
        with self._conn() as conn:
            conn.execute("""
                UPDATE memories
                SET last_accessed = ?, access_count = access_count + 1
                WHERE id = ?
            """, (datetime.utcnow().isoformat(), memory_id))

    # --- Episode CRUD ---

    def save_episode(self, episode_id: str, title: str, started_at: str,
                     ended_at: str | None, outcome: str,
                     events: list, lessons: list, tags: list) -> None:
        with self._conn() as conn:
            conn.execute("""
                INSERT OR REPLACE INTO episodes
                (id, title, started_at, ended_at, outcome, events, lessons, tags)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                episode_id, title, started_at, ended_at, outcome,
                json.dumps(events), json.dumps(lessons), json.dumps(tags),
            ))

    def get_episodes(self, tag: str | None = None, limit: int = 20) -> list[dict]:
        with self._conn() as conn:
            if tag:
                rows = conn.execute(
                    "SELECT * FROM episodes WHERE tags LIKE ? ORDER BY started_at DESC LIMIT ?",
                    (f'%"{tag}"%', limit),
                ).fetchall()
            else:
                rows = conn.execute(
                    "SELECT * FROM episodes ORDER BY started_at DESC LIMIT ?",
                    (limit,),
                ).fetchall()
            results = []
            for row in rows:
                d = dict(row)
                d["events"] = json.loads(d["events"])
                d["lessons"] = json.loads(d["lessons"])
                d["tags"] = json.loads(d["tags"])
                results.append(d)
            return results

    # --- KV Store ---

    def kv_set(self, key: str, value: Any) -> None:
        with self._conn() as conn:
            conn.execute(
                "INSERT OR REPLACE INTO kv_store (key, value, updated_at) VALUES (?, ?, ?)",
                (key, json.dumps(value), datetime.utcnow().isoformat()),
            )

    def kv_get(self, key: str, default: Any = None) -> Any:
        with self._conn() as conn:
            row = conn.execute("SELECT value FROM kv_store WHERE key = ?", (key,)).fetchone()
            return json.loads(row["value"]) if row else default

    # --- Maintenance ---

    def gc(self, max_age_days: int = 90, min_importance: float = 0.2) -> int:
        """Garbage collect old, low-importance memories."""
        cutoff = datetime.utcnow()
        with self._conn() as conn:
            result = conn.execute("""
                DELETE FROM memories
                WHERE importance < ?
                AND access_count < 2
                AND julianday(?) - julianday(created_at) > ?
            """, (min_importance, cutoff.isoformat(), max_age_days))
            return result.rowcount
Enter fullscreen mode Exit fullscreen mode

Why SQLite? It's embedded, zero-config, handles concurrent reads, and survives restarts. For single-agent systems, it's all you need.


6. Memory Consolidation (Sleep-Like Processing)

Humans consolidate memories during sleep. Your agent should do the same between sessions.

# memory/consolidation.py
import asyncio
from datetime import datetime, timedelta

class MemoryConsolidator:
    """Runs between sessions to organize and strengthen memories."""

    def __init__(self, semantic_memory, episodic_memory, store):
        self.semantic = semantic_memory
        self.episodic = episodic_memory
        self.store = store

    async def consolidate(self) -> dict:
        """Run full consolidation cycle."""
        report = {
            "started_at": datetime.utcnow().isoformat(),
            "merged_duplicates": 0,
            "extracted_lessons": 0,
            "decayed_memories": 0,
            "promoted_memories": 0,
        }

        # 1. Merge near-duplicate memories
        report["merged_duplicates"] = await self._merge_duplicates()

        # 2. Extract patterns from recent episodes
        report["extracted_lessons"] = await self._extract_patterns()

        # 3. Decay unused memories
        report["decayed_memories"] = await self._decay_unused()

        # 4. Promote frequently accessed memories
        report["promoted_memories"] = await self._promote_frequent()

        report["completed_at"] = datetime.utcnow().isoformat()
        self.store.kv_set("last_consolidation", report)

        return report

    async def _merge_duplicates(self) -> int:
        """Find and merge semantically similar memories."""
        entries = list(self.semantic.entries.values())
        merged = 0
        seen_ids = set()

        for i, entry_a in enumerate(entries):
            if entry_a.id in seen_ids:
                continue
            for entry_b in entries[i + 1:]:
                if entry_b.id in seen_ids:
                    continue
                similarity = self.semantic._cosine_similarity(
                    entry_a.embedding, entry_b.embedding
                )
                if similarity > 0.92:
                    # Keep the one with higher importance
                    if entry_a.importance >= entry_b.importance:
                        entry_a.access_count += entry_b.access_count
                        seen_ids.add(entry_b.id)
                    else:
                        entry_b.access_count += entry_a.access_count
                        seen_ids.add(entry_a.id)
                    merged += 1

        for entry_id in seen_ids:
            self.semantic.entries.pop(entry_id, None)

        return merged

    async def _extract_patterns(self) -> int:
        """Find recurring lessons across episodes."""
        recent = [
            ep for ep in self.episodic.episodes
            if ep.ended_at and
            (datetime.utcnow() - ep.ended_at).days < 7
        ]

        all_lessons = []
        for ep in recent:
            all_lessons.extend(ep.lessons)

        # Group similar lessons
        extracted = 0
        for lesson in all_lessons:
            await self.semantic.store(
                content=f"Learned lesson: {lesson}",
                category="lesson",
                importance=0.7,
                source="consolidation",
            )
            extracted += 1

        return extracted

    async def _decay_unused(self) -> int:
        """Reduce importance of memories that are never accessed."""
        decayed = 0
        cutoff = datetime.utcnow() - timedelta(days=14)

        for entry in self.semantic.entries.values():
            if entry.last_accessed < cutoff and entry.access_count < 2:
                entry.importance *= 0.8  # 20% decay
                decayed += 1

        return decayed

    async def _promote_frequent(self) -> int:
        """Increase importance of frequently accessed memories."""
        promoted = 0
        for entry in self.semantic.entries.values():
            if entry.access_count >= 5 and entry.importance < 0.9:
                entry.importance = min(1.0, entry.importance * 1.2)
                promoted += 1
        return promoted
Enter fullscreen mode Exit fullscreen mode

The trick nobody mentions: Memory isn't just storage — it's an active process. Consolidation keeps your agent's memory sharp and relevant.


7. Context-Aware Memory Retrieval

Don't just search memories by query. Factor in the current task, emotional state, and conversation trajectory.

# memory/contextual_retrieval.py
from dataclasses import dataclass

@dataclass
class RetrievalContext:
    current_query: str
    task_type: str  # "debugging", "creating", "analyzing", "chatting"
    conversation_topics: list[str]
    urgency: float = 0.5  # 0 = casual, 1 = critical
    user_sentiment: str = "neutral"

class ContextualRetriever:
    """Retrieves memories considering full conversation context."""

    def __init__(self, semantic_memory, episodic_memory):
        self.semantic = semantic_memory
        self.episodic = episodic_memory

    async def retrieve(
        self, context: RetrievalContext, max_tokens: int = 2000
    ) -> str:
        """Build a memory context block tailored to the current situation."""
        blocks = []
        remaining_tokens = max_tokens

        # 1. Relevant facts (always included)
        facts = await self.semantic.recall(
            context.current_query,
            top_k=5,
            min_importance=0.3 if context.urgency < 0.5 else 0.0,
        )
        if facts:
            fact_block = "## Relevant Knowledge\n"
            for f in facts:
                line = f"- {f.content}\n"
                tokens = len(line) // 4
                if tokens < remaining_tokens:
                    fact_block += line
                    remaining_tokens -= tokens
            blocks.append(fact_block)

        # 2. Past experiences (for debugging/problem-solving tasks)
        if context.task_type in ("debugging", "analyzing"):
            episodes = self.episodic.recall_similar(
                context.current_query, top_k=2
            )
            for ep in episodes:
                narrative = ep.to_narrative()
                tokens = len(narrative) // 4
                if tokens < remaining_tokens:
                    blocks.append(narrative)
                    remaining_tokens -= tokens

        # 3. User preferences (for creation/chat tasks)
        if context.task_type in ("creating", "chatting"):
            prefs = await self.semantic.recall(
                "user preferences and style",
                top_k=3,
                category="preference",
            )
            if prefs:
                pref_block = "## User Preferences\n"
                for p in prefs:
                    pref_block += f"- {p.content}\n"
                blocks.append(pref_block)

        # 4. Learned lessons (when urgency is high)
        if context.urgency > 0.7:
            topic_lessons = self.episodic.get_lessons_for(
                context.conversation_topics
            )
            if topic_lessons:
                lesson_block = "## ⚠️ Lessons from Past Experience\n"
                for lesson in topic_lessons[:5]:
                    lesson_block += f"- {lesson}\n"
                blocks.append(lesson_block)

        return "\n\n".join(blocks) if blocks else ""
Enter fullscreen mode Exit fullscreen mode

8. The Unified Memory Manager

Tie everything together with a single interface.

# memory/manager.py
class UnifiedMemoryManager:
    """Single interface for all memory subsystems."""

    def __init__(self, db_path: str = "agent_memory.db"):
        self.store = PersistentMemoryStore(db_path)
        self.sliding = SlidingWindowMemory()
        self.semantic = SemanticMemory()
        self.episodic = EpisodicMemory()
        self.working = WorkingMemory()
        self.consolidator = MemoryConsolidator(
            self.semantic, self.episodic, self.store
        )
        self.retriever = ContextualRetriever(self.semantic, self.episodic)

    async def process_message(self, role: str, content: str) -> None:
        """Process an incoming message across all memory systems."""
        msg = Message(role=role, content=content)
        self.sliding.add(msg)

        # Auto-extract facts from assistant responses
        if role == "assistant":
            facts = self._extract_facts(content)
            for fact in facts:
                await self.semantic.store(fact, category="fact", importance=0.4)

    async def get_full_context(self, query: str, task_type: str = "chatting") -> dict:
        """Build complete context from all memory systems."""
        retrieval_ctx = RetrievalContext(
            current_query=query,
            task_type=task_type,
            conversation_topics=self._extract_topics(query),
        )

        return {
            "conversation": self.sliding.get_context(),
            "memories": await self.retriever.retrieve(retrieval_ctx),
            "working_memory": self.working.to_context(),
        }

    async def end_session(self) -> None:
        """Persist and consolidate at session end."""
        await self.consolidator.consolidate()

    def _extract_facts(self, text: str) -> list[str]:
        """Simple fact extraction from text."""
        sentences = text.split(". ")
        facts = []
        fact_indicators = ["is", "are", "was", "means", "requires", "should"]
        for s in sentences:
            if any(ind in s.lower() for ind in fact_indicators) and len(s) > 20:
                facts.append(s.strip())
        return facts[:3]  # Max 3 facts per message

    def _extract_topics(self, text: str) -> list[str]:
        words = text.lower().split()
        # Simple keyword extraction (use NLP in production)
        return [w for w in words if len(w) > 5][:5]
Enter fullscreen mode Exit fullscreen mode

The Memory Architecture Stack

Layer Pattern What It Stores
Immediate Sliding Window Last N messages + summary
Short-term Working Memory Current task state
Long-term Semantic Memory Facts, preferences, knowledge
Experiential Episodic Memory What happened and lessons
Persistent SQLite Store Everything, across restarts
Maintenance Consolidation Garbage collection + strengthening
Retrieval Contextual Smart, situation-aware recall
Unified Manager Single API for everything

The key insight: memory isn't one thing. Humans have multiple memory systems working together. Your agents should too.


Getting Started

Building a complete memory system is non-trivial. If you're looking for production-ready building blocks — token management, multi-model routing, and agent infrastructure patterns — the AI Dev Toolkit has components that integrate with exactly these kinds of architectures.

Start with Pattern 1 (sliding window). Add semantic memory when your agent needs to remember across sessions. Add episodic memory when it needs to learn from experience.

What memory patterns are you using in your agents? I'd love to hear what's working in production.


This is part of the "AI Engineering in Practice" series — building real AI systems, not demos.

Top comments (0)