At 2 a.m. last Thursday, my ops colleague blew up my phone: “Customers are flooding the complaint channels. Our AI support bot has amnesia — one second a customer says ‘My order number is 8892’, the next the bot asks ‘May I have your order number?’” We had just launched a smart customer service system built with LangChain for the conversation chain. Load testing during the day looked perfect, but once real traffic hit at night, the memory loss rate spiked above 30%. I could feel the customers’ fury through the screen.
What chilled me most was this: if the same bug happened in a fintech or healthcare scenario, it wouldn’t be just complaints — it could be catastrophic. That same night I made the decision: we must lock down “memory storage consistency” with an automated test suite so it never slips into production again.
Breaking down the problem: why does LangChain’s memory just disappear?
Our support system looked roughly like this:
User -> FastAPI -> LangChain ConversationChain (with memory) -> LLM
To move fast during the initial launch, we chose ConversationBufferMemory with the default InMemoryChatMessageHistory. Under the hood, it’s essentially a dictionary: the key is the session_id and the value is a list of messages. It seemed fine — until we deployed with multi-process Gunicorn and multiple Pods. Three culprits then struck simultaneously:
- No shared memory: A user’s first request lands on Pod A, which stores the memory in Pod A’s local memory. The second request gets load-balanced to Pod B, which has no memory at all → amnesia.
- No concurrency control: When multiple coroutines or threads write to the same session dictionary concurrently, messages overwrite each other and memory becomes a garbled mess.
- No persistence: When a Pod restarts, all customer memory evaporates. Returning customers instantly become “new users”.
Of course we knew we should switch to a shared store like Redis. But the truly fatal flaw was this: from development to testing, no step ever verified the correctness of memory storage. Business tests only ran single-threaded, single-process happy paths. Concurrency and restart scenarios were a complete blind spot. That’s the classic trap: you think everything is fine — until it blows up.
Designing the fix: why Pytest + Redis in three acts?
Plugging this hole required more than just swapping the backend to Redis — we needed tests that could truly nail down consistency. My goal was clear: write a set of Pytest tests that automatically verify three critical behaviors — isolation, persistence, and concurrency safety.
The tech choices were straightforward:
-
Redis as the memory backend: In production, multiple Pods inside K8s must share state. Redis is the infrastructure our team knows best, and it has the officially maintained
RedisChatMessageHistory(inlangchain-community). -
Pytest + pytest-asyncio as the test framework: Our project is an async FastAPI service, so tests must be async too.
pytest-asynciomakes it easy to simulate concurrent scenarios. - No mocking, real Redis connection: Memory consistency problems hide exactly in real network calls, serialization, and connection pooling. Mocking those would be self-deception. We use an isolated test Redis database (db=15) and flush it after each test — no interference.
Why not just add logging in production and observe? Because “occasionally lost” bugs drown in a sea of normal requests. By the time you locate the affected session, the customer is already gone. A test written before the incident is ten thousand times cheaper than firefighting afterward.
Core implementation: three tests that lock down memory consistency
1. First, set up the test infrastructure (conftest.py)
This code ensures every test gets a clean Redis memory backend:
# conftest.py
import pytest
import redis
from langchain_community.chat_message_histories import RedisChatMessageHistory
REDIS_URL = "redis://localhost:6379/15" # dedicated test db
@pytest.fixture
def redis_client():
"""Create a Redis client, clear data after the test"""
client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
yield client
client.flushdb() # clean isolation for each test
client.close()
@pytest.fixture
def make_history():
"""Factory: create memory instance with a given session_id"""
def _make(session_id: str) -> RedisChatMessageHistory:
return RedisChatMessageHistory(
session_id=session_id,
url=REDIS_URL,
ttl=3600 # matches production; expiration also causes "loss"
)
return _make
2. Test 1: Isolation — conversation A must never leak into conversation B
This test verifies that memories from different session_ids are completely isolated, catching low-level but deadly bugs like key concatenation errors or wrong Redis db writes.
python
# test_memory_isolation.py
import pytest
@pytest.mark.asyncio
async def test_memory_isolation(make_history):
"""Two different sessions should not see each other's messages"""
history_a = make_history("session_a")
history_b = make_history("session_b")
# Simulate session A: customer mentions an order number
await history_a.aadd_user_message("我的订单号是 8892")
await history_a.aadd_ai_message("好的,正在为您查询订单 8892")
# Simulate session B: completely different topic
await history_b.aadd_user_message("我想退货")
await history_b.aadd_ai_message("请提供您的手机号")
# Assert: session A must have absolutely zero content from B
msgs_a = await history_a.aget_messages()
content_a = " ".join([m.content for m in msgs_a])
assert "退货" not in content_a, "A 会话泄漏了 B 的对话内容!"
assert "8892" in content_a, "A 会话自身的消
Top comments (0)