"没有记忆的Agent,每次对话都是初次见面。" 记忆管理是AI Agent从Demo走向Production最被忽视的核心能力。本文从认知科学到工程实现,全面解析Agent记忆架构。
一、Agent为什么需要记忆?
1.1 无记忆Agent的困境
# 没有记忆的Agent——每次都是新的
session_1: "我叫张三,在AWS工作" → "你好张三!"
session_2: "我上次说的项目进展如何?" → "抱歉,我不知道你是谁..."
这就像一个每天失忆的助手——能力再强也无法建立信任和效率。
1.2 人类记忆 vs Agent记忆
认知科学将人类记忆分为多种类型。Agent的记忆架构可以借鉴这个分类:
| 类型 | 人类类比 | Agent实现 | 持久性 |
|---|---|---|---|
| Working Memory | 工作记忆 | 当前对话上下文 | 会话级 |
| Short-term Memory | 短期记忆 | 最近N轮对话 | 小时级 |
| Long-term Memory | 长期记忆 | 向量数据库 | 永久 |
| Semantic Memory | 语义记忆 | 知识图谱/文档 | 永久 |
| Episodic Memory | 情景记忆 | 对话历史 | 永久 |
| Procedural Memory | 程序记忆 | Skills/Tools | 永久 |
二、记忆架构设计
2.1 分层记忆系统
from dataclasses import dataclass, field
from typing import Optional
import time
@dataclass
class Memory:
content: str
memory_type: str # working, short_term, long_term
importance: float # 0-1
timestamp: float = field(default_factory=time.time)
access_count: int = 0
embedding: Optional[list] = None
class TieredMemorySystem:
"""分层记忆系统——模拟人类记忆层次"""
def __init__(self, embedding_model, vector_store):
self.working = [] # 当前对话(最多10条)
self.short_term = [] # 最近1小时
self.long_term = vector_store # 向量数据库
self.embedding_model = embedding_model
async def store(self, content: str, importance: float = 0.5):
memory = Memory(content=content, memory_type="working",
importance=importance)
memory.embedding = await self.embedding_model.embed(content)
# 始终存入working memory
self.working.append(memory)
if len(self.working) > 10:
# 溢出到short_term
overflow = self.working.pop(0)
overflow.memory_type = "short_term"
self.short_term.append(overflow)
# 重要记忆直接存入long_term
if importance > 0.7:
memory.memory_type = "long_term"
await self.long_term.upsert(memory)
async def recall(self, query: str, k: int = 5) -> list[Memory]:
"""多层检索——按相关性+时效性+重要性加权"""
query_embedding = await self.embedding_model.embed(query)
# 1. Working memory(全部返回)
results = list(self.working)
# 2. Short-term(语义搜索)
st_results = self._semantic_search(
query_embedding, self.short_term, k=3)
results.extend(st_results)
# 3. Long-term(向量数据库搜索)
lt_results = await self.long_term.search(
query_embedding, k=k,
score_threshold=0.7)
results.extend(lt_results)
# 按综合分数排序
return sorted(results,
key=lambda m: self._score(m, query_embedding),
reverse=True)[:k]
def _score(self, memory, query_embedding):
"""综合评分 = 相关性 × 时效性 × 重要性"""
relevance = cosine_similarity(memory.embedding, query_embedding)
recency = 1.0 / (1 + (time.time() - memory.timestamp) / 3600)
return relevance * 0.5 + recency * 0.2 + memory.importance * 0.3
2.2 上下文窗口管理
128K上下文窗口看似巨大,但分配不当照样溢出:
class ContextWindowManager:
def __init__(self, max_tokens: int = 128000):
self.max_tokens = max_tokens
self.allocations = {
"system_prompt": 0.05, # 5% = 6,400 tokens
"memory_context": 0.25, # 25% = 32,000 tokens
"tool_results": 0.15, # 15% = 19,200 tokens
"chat_history": 0.35, # 35% = 44,800 tokens
"response_buffer": 0.20, # 20% = 25,600 tokens
}
def build_context(self, system, memories, tools, history):
context = []
# 1. System prompt(固定)
context.append({"role": "system", "content": system})
# 2. 记忆上下文(按重要性截断)
memory_budget = int(self.max_tokens * self.allocations["memory_context"])
memory_text = self._fit_to_budget(memories, memory_budget)
if memory_text:
context.append({"role": "system", "content": f"相关记忆:\n{memory_text}"})
# 3. 对话历史(保留最近N轮,重要轮次不删)
history_budget = int(self.max_tokens * self.allocations["chat_history"])
context.extend(self._trim_history(history, history_budget))
return context
def _trim_history(self, history, budget):
"""智能裁剪——保留首轮+重要轮+最近N轮"""
if count_tokens(history) <= budget:
return history
# 始终保留第一轮(用户意图)和最后5轮
first = history[:2]
recent = history[-10:]
# 中间部分做摘要
middle = history[2:-10]
if middle:
summary = summarize(middle)
return first + [{"role": "system", "content": f"中间对话摘要: {summary}"}] + recent
return first + recent
三、向量数据库选型
3.1 主流方案对比
| 数据库 | 类型 | QPS(1M) | 延迟P99 | 成本/月 | 适用场景 |
|---|---|---|---|---|---|
| Pinecone | 托管 | 10K | 20ms | $70+ | 快速上手、小规模 |
| Qdrant | 自托管 | 12K | 15ms | 服务器成本 | 高性能、灵活 |
| Milvus | 自托管 | 15K | 12ms | 服务器成本 | 大规模、GPU加速 |
| pgvector | 扩展 | 5K | 30ms | 已有PG成本 | 已用PostgreSQL |
| ChromaDB | 嵌入式 | 3K | 50ms | 免费 | 原型开发 |
3.2 实战推荐
# 场景1:快速原型 → ChromaDB
import chromadb
client = chromadb.Client()
collection = client.create_collection("agent_memory")
collection.add(documents=["用户偏好中文回复"], ids=["mem_001"])
# 场景2:生产环境(AWS) → OpenSearch + pgvector
# OpenSearch做向量搜索,RDS PostgreSQL做结构化查询
# 场景3:高性能 → Qdrant
from qdrant_client import QdrantClient
client = QdrantClient("localhost", port=6333)
client.upsert(collection_name="memories",
points=[PointStruct(id=1, vector=embedding, payload={"content": "..."})])
四、检索策略优化
4.1 综合检索
class HybridRetriever:
"""融合多种检索信号的混合检索器"""
def __init__(self, vector_store, weights=None):
self.vector_store = vector_store
self.weights = weights or {
"relevance": 0.5, # 语义相关性
"recency": 0.2, # 时间新鲜度
"importance": 0.2, # 重要性评分
"frequency": 0.1, # 访问频率
}
async def retrieve(self, query: str, k: int = 5) -> list:
# 语义搜索
candidates = await self.vector_store.search(query, k=k*3)
# 综合评分
scored = []
for mem in candidates:
score = (
self.weights["relevance"] * mem.similarity +
self.weights["recency"] * self._recency_score(mem) +
self.weights["importance"] * mem.importance +
self.weights["frequency"] * min(mem.access_count / 10, 1.0)
)
scored.append((score, mem))
# 按分数排序,返回top-k
scored.sort(key=lambda x: x[0], reverse=True)
return [mem for _, mem in scored[:k]]
def _recency_score(self, memory):
hours_ago = (time.time() - memory.timestamp) / 3600
return 1.0 / (1 + hours_ago / 24) # 24小时半衰期
五、记忆衰减与遗忘
5.1 为什么需要遗忘?
不遗忘的Agent会:
- 上下文被无关记忆污染
- 检索质量下降(噪声太多)
- 存储成本线性增长
5.2 重要性加权衰减
class ImportanceWeightedDecay:
"""重要记忆衰减慢,琐碎记忆衰减快"""
def __init__(self, base_half_life_hours=72):
self.base_half_life = base_half_life_hours
def decay_score(self, memory: Memory) -> float:
hours_elapsed = (time.time() - memory.timestamp) / 3600
# 重要性越高,半衰期越长
half_life = self.base_half_life * (0.3 + 0.7 * memory.importance)
# 指数衰减,但保底30%(重要记忆不会完全消失)
base_strength = 0.3 * memory.importance
decay = (1 - base_strength) * (0.5 ** (hours_elapsed / half_life))
return base_strength + decay
async def cleanup(self, vector_store, threshold=0.1):
"""定期清理衰减到阈值以下的记忆"""
all_memories = await vector_store.list_all()
to_delete = [m.id for m in all_memories
if self.decay_score(m) < threshold]
if to_delete:
await vector_store.delete(to_delete)
print(f"Cleaned up {len(to_delete)} decayed memories")
六、成本优化
6.1 分层存储策略
class CostOptimizedMemory:
"""热数据用内存,温数据用SSD,冷数据用S3"""
def __init__(self):
self.hot = {} # 最近1小时,内存
self.warm = None # 最近1周,本地向量DB
self.cold = None # 更早,S3 + 按需加载
async def recall(self, query, k=5):
# 先查热数据(0延迟)
hot_results = self._search_hot(query, k)
if len(hot_results) >= k:
return hot_results
# 再查温数据(毫秒级)
warm_results = await self.warm.search(query, k - len(hot_results))
results = hot_results + warm_results
if len(results) >= k:
return results
# 最后查冷数据(秒级,但很少需要)
cold_results = await self._search_cold(query, k - len(results))
return results + cold_results
6.2 Embedding缓存
# Embedding是最大的重复成本
from functools import lru_cache
class CachedEmbedder:
def __init__(self, model="text-embedding-3-small"):
self.model = model
self.cache = {} # 生产环境用Redis
async def embed(self, text: str) -> list:
cache_key = hashlib.md5(text.encode()).hexdigest()
if cache_key in self.cache:
return self.cache[cache_key] # 缓存命中率通常>40%
result = await openai.embeddings.create(
model=self.model, input=text)
self.cache[cache_key] = result.data[0].embedding
return self.cache[cache_key]
七、OpenClaw的记忆实践
作为OpenClaw贡献者,我分享一下OpenClaw的记忆架构设计:
# OpenClaw记忆层次
MEMORY.md → 长期记忆(手动策展,如人类日记精华)
memory/*.md → 每日日志(自动记录,如人类日记原始内容)
AGENTS.md → 程序记忆(工作规范,如人类的肌肉记忆)
TOOLS.md → 工具记忆(环境配置,如人类记住工具在哪)
这个设计简单但有效——文件系统就是向量数据库的最简替代。对于个人AI助手场景,Markdown文件 + 语义搜索 = 足够好的记忆系统。
八、总结
Agent记忆管理的核心原则:
- 分层存储——热/温/冷三层,平衡性能和成本
- 智能检索——相关性 + 时效性 + 重要性加权
- 主动遗忘——不是所有记忆都值得保留
- 上下文管理——合理分配128K token预算
- 成本控制——缓存embedding,分层存储
记忆不是越多越好,是越准越好。
作者:JiaDe Wu | AWS Solutions Architect | sample-OpenClaw-on-AWS-with-Bedrock Owner | GitHub: github.com/JiaDe-Wu






Top comments (0)