DEV Community

BAOFUFAN
BAOFUFAN

Posted on

After 2 Years with LangChain, I Discovered I've Been Testing AI Agent Memory All Wrong

At 3 AM, I was jolted awake by a PagerDuty alert. One log entry sent chills down my spine: the agent had taken the last four digits of a bank card that User A had just entered, and pasted them verbatim into User B's conversation summary. Half an hour later, the customer's inbox was on fire — "Your AI is mixing up memories!" That cross‑talk scenario was something I was completely missing in every unit test I thought I had written.

This incident made me revisit a question most developers skip: how do you actually verify that an AI agent's memory storage is consistent?

Breaking it down

First, let's be clear about what an agent's "memory" really is. Whether you use LangChain's ConversationBufferMemory, ConversationSummaryMemory, or a home‑grown pipeline, you're doing three things at the core:

  1. Write – persist the current conversation turn (or a distilled summary) to some storage medium — Redis, Postgres, a Chroma vector store, or even a plain dict.
  2. Read – fetch the history before the next turn and inject it into the prompt.
  3. Evict / compress – when the memory exceeds the token budget, kick out older items by time or importance, or summarise with an LLM.

The trouble sits right at the intersection of "read" and "evict / compress". Almost every agent developer has hit these scenarios:

  • Read and write‑back are not atomic; two concurrent conversation turns can race, and the later one silently overwrites the context written by the earlier one.
  • Summary compression fires multiple times, but the result isn't idempotent. Two compression jobs finish and the older summary ends up stored, dropping the latest information.
  • A key with a TTL expires within a millisecond window; the agent reads empty memory, assumes the user is brand new, and asks a greeting that should never have been repeated.

Why can't manual testing catch this? Because manual tests just run a few fixed dialogues back and forth. You can't simulate concurrent writes, boundary cases where TTL expires exactly, or the race of a compression function being called twice. Those "sporadic" conditions, once they align in production, become P0 incidents of privacy leaks or task interruption.

Conventional unit tests mock the storage layer, but mocking strips away all concurrency characteristics and real I/O latency. The "consistency" you verify looks clean on paper, yet it still breaks in production. We need an automated verification approach that brings in real storage, real concurrency, and real clock differences.

Design

The choice was straightforward: build the tests with Pytest + real storage containers. Specifically:

  • Storage layer – test code operates directly against a real Redis instance (using the redis library, or fakeredis for partial simulation) and a real Chroma vector store. If CI dependencies are a concern, fallback to fakeredis + duckdb as the Chroma backend, but the critical concurrency cases must run against genuine instances.
  • Concurrency primitives – use Python's threading or asyncio to spawn multiple coroutines / threads inside a single test, mimicking real production concurrent memory writes.
  • Consistency assertions – don't just assert "does the memory contain value X?" Instead, assert that for any valid serialisation order of the writes, the final memory state must correspond to one of those orders. In other words, as long as you can enumerate the possible serialisable outcomes, the final state must be one of them.
  • Why not the alternatives:
    • No pure mocks – mocks have no latency and hide races.
    • No LangChain MemoryTestHelper – it only exercises single operations, not interleaved scenarios.
    • No integration tests that rely on sleep and luck – luck can't surface the root cause of deterministic bugs.

The architecture: each test provisions a clean storage namespace (Redis: separate db number or key prefix; Chroma: temporary collection). Pytest fixtures handle creation and teardown, guaranteeing absolute isolation between test cases. Test functions control the timeline — capturing state precisely before and after concurrent writes — and finalise with deterministic assertions.

Core implementation

Let's turn the plan into runnable code step by step. The first fixture supplies a clean Redis memory storage infrastructure.

# conftest.py
import pytest
import redis
import uuid
from typing import Generator

@pytest.fixture
def redis_memory() -> Generator[redis.Redis, None, None]:
    """
    每个测试拿到一个独立前缀的Redis客户端,
    测完自动清理,确保用例间无残留状态。
    """
    client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
    namespace = f"test_agent_memory:{uuid.uuid4().hex}"  # 隔离前缀
    # 将namespace注入到client对象上,供测试函数使用
    client._namespace = namespace
    yield client
    # 清理:删除所有属于当前namespace的key
    keys = client.keys(f"{namespace}:*")
    if keys:
        client.delete(*keys)
    client.close()
Enter fullscreen mode Exit fullscreen mode

Next, a minimal Agent memory utility class that wraps the "read history – append new message – write back" logic. This class is exactly the one we suspect to be the source of the race conditions.

# agent_memory.py
import json
import redis

class AgentMemory:
    """基于Redis的简单会话记忆存储,读取-修改-写回模式"""

    def __init__(self, client: redis.Redis, session_id: str, namespace: str):
        self.client = client
        self.session_id = session_id
        self.key = f"{namespace}:memory:{session_id}"

    def recall(self) -> list[str]:
        """读取全部历史消息列表"""
        data = self.client.get(self.key)
        if data is None:
            return []
        return json.loads(data)

    def remember(self, message: str) -> list[str]:
        """
        当前记忆追加一条消息并写回。
        这里是典型的读-改-写,不是原子操作!
        """
        history = self.recall()
        history.append(message)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)