DEV Community

BAOFUFAN
BAOFUFAN

Posted on

LangChain Chatbot Session Mix-Up: When 100 Concurrent Users Got Each Other's Replies

At 2 AM, my phone blared. It was our ops guy, pulling me out of a dream: “Users are posting screenshots in the support group — they’re seeing other people’s order numbers inside our AI assistant. The group is going crazy.”

I logged onto the server, checked the logs, and my brain went blank. Multiple requests had their conversation histories all jumbled together. User A asked “Where is my delivery?” and the bot answered with User B’s address. It felt like locking your door, coming home, and finding a stranger sitting on your couch.

Breaking It Down: Why User Sessions “Cross Wires”

Our setup was straightforward: a multi-turn chatbot built with LangChain, backed by a large language model, served via FastAPI. Each user was supposed to have an independent conversation memory, never interfering with one another. Unit tests and feature tests all passed. Management gave the green light, and we rolled out a canary release.

The moment traffic hit, everything fell apart.

The root cause lies in the most common usage of LangChain’s ConversationBufferMemory: it stores conversation history in Python’s in-process memory. When concurrent requests pour in, if you don’t deliberately isolate by session_id, all requests share the same memory instance. User A’s history gets contaminated with User B’s messages, the model receives dirty context, and data cross-contamination occurs.

There’s an even sneakier trap: even if you deepcopy the memory on every request, if you fail to atomically combine “load history by session” and “write back after the call,” concurrent writes will still overwrite each other. User B can end up reading leftover history from User A. This problem never shows up in sequential single-user testing; it waits patiently until concurrency arrives.

Design Decisions: Why We Didn’t Pick the Alternatives

What we fundamentally needed was a conversation-history store keyed by session_id, persistent across requests, and safe for concurrent reads and writes.

A few paths were on the table:

  • In-process dict + lock: the simplest approach, but it can’t span processes, all data disappears on restart, and the lock would strangle concurrency performance. Immediately ruled out.
  • PostgreSQL / MySQL: they can store the data, but chat scenarios are read-heavy with occasional writes. Frequent SELECTs to retrieve history would amplify database pressure, and adding JSON serialization on top makes the latency not worth it.
  • Redis: in-memory read/write speed, native support for List data structures to hold multiple messages, the ability to set TTL per session for automatic expiration, and a single instance can easily handle tens of thousands of concurrent operations. On top of that, LangChain’s BaseChatMessageHistory interface already provides a slot to swap in alternative storage, making the implementation cost extremely low.

So our final choice: implement RedisChatMessageHistory, store each session’s messages in a Redis List keyed as messages:{session_id}, and then let LangChain’s RunnableWithMessageHistory automatically complete the cycle of “load on request, write after response.”

Core Implementation: Three Steps to Isolation

Step 1: Implement a custom RedisChatMessageHistory

This piece solves how to persist conversation history into Redis. We inherit from BaseChatMessageHistory, use a Redis List to hold the messages, and serialize each message as a JSON string.

import json
import logging
from typing import List

import redis
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, messages_from_dict, message_to_dict

logger = logging.getLogger(__name__)


class RedisChatMessageHistory(BaseChatMessageHistory):
    """使用 Redis List 存储对话历史,每个 session_id 一个 key"""

    def __init__(
        self,
        session_id: str,
        redis_client: redis.Redis,
        ttl: int = 3600,          # 默认 1 小时过期,避免僵尸 key
        key_prefix: str = "chat_history:",
    ):
        self.session_id = session_id
        self.redis_client = redis_client
        self.ttl = ttl
        self.key = f"{key_prefix}{session_id}"

    @property
    def messages(self) -> List[BaseMessage]:
        # 从 Redis List 中读取全部消息,范围 0 到 -1 保证不丢数据
        raw_messages = self.redis_client.lrange(self.key, 0, -1)
        # 每一条存储时是 JSON 字符串,这里反序列化回 LangChain 消息对象
        dict_messages = [json.loads(m) for m in raw_messages]
        return messages_from_dict(dict_messages)

    def add_message(self, message: BaseMessage) -> None:
        # message_to_dict 将 BaseMessage 转成 dict,再 dump 成 JSON 字符串
        serialized = json.dumps(message_to_dict(message))
        # 写入 Redis List 右侧,保持时间顺序
        self.redis_client.rpush(self.key, serialized)
        # 刷新过期时间,活跃用户的历史保持不过期
        self.redis_client.expire(self.key, self.ttl)

    def clear(self) -> None:
        self.redis_client.delete(self.key)
Enter fullscreen mode Exit fullscreen mode

Step 2: Create a history factory to load and write per session automatically

This part solves how each request gets its own history instance. We need a factory function so that RunnableWithMessageHistory instantiates our RedisChatMessageHistory using the session_id on every call.

from langchain_core.runnables.history impo
Enter fullscreen mode Exit fullscreen mode

Top comments (0)