AI Agent记忆系统2026:让Agent真正“记住“一切
·
CSDN 2026年7月热榜:记忆系统是AI Agent从"无状态工具"进化为"有认知能力智能体"的关键基础设施。从感知记忆、工作记忆、情景记忆到语义记忆,多级记忆架构让Agent能够记住用户偏好、历史交互、领域知识,实现真正的"个性化"和"持续学习"。本文深度解析记忆架构设计、存储方案、检索策略与生产实战。
1. Agent记忆系统架构
1.1 为什么Agent需要记忆
无状态Agent vs 有记忆Agent:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
无状态Agent (2024):
User: "帮我写个登录功能"
Agent: [生成代码]
User: "再加个注册功能"
Agent: [不知道之前的代码] ❌
→ 每次对话都是全新的
→ 无法利用历史上下文
→ 用户体验割裂
有记忆Agent (2026):
User: "帮我写个登录功能"
Agent: [生成代码] [存储到记忆]
User: "再加个注册功能"
Agent: [回忆之前的登录代码] [复用风格]
→ 记住历史交互
→ 保持用户偏好
→ 持续优化服务
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1.2 多级记忆架构
Agent记忆系统架构 (参考人类记忆):
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
┌─────────────────────────────────────────┐
│ 感知记忆 (Sensory Memory) │
│ 当前上下文窗口 │
│ 容量: 200K tokens │
│ 持续: 当前会话 │
└─────────────────────────────────────────┘
↓ 注意力筛选
┌─────────────────────────────────────────┐
│ 工作记忆 (Working Memory) │
│ 当前任务的相关信息 │
│ 容量: 动态 │
│ 持续: 任务执行期间 │
└─────────────────────────────────────────┘
↓ 编码存储
┌─────────────────────────────────────────┐
│ 情景记忆 (Episodic Memory) │
│ 历史交互事件 │
│ 容量: 无限 │
│ 持续: 永久 │
└─────────────────────────────────────────┘
↓ 抽象提炼
┌─────────────────────────────────────────┐
│ 语义记忆 (Semantic Memory) │
│ 知识和规则 │
│ 容量: 无限 │
│ 持续: 永久 │
└─────────────────────────────────────────┘
人类记忆类比:
• 感知记忆 ← 看到的东西 (几秒)
• 工作记忆 ← 正在思考的事 (几分钟)
• 情景记忆 ← 经历过的事 (一生)
• 语义记忆 ← 学到的知识 (一生)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2. 感知记忆:上下文管理
2.1 上下文窗口优化
"""
感知记忆 = LLM的Context Window
核心挑战: 有限的窗口 vs 无限的历史
"""
from typing import List, Dict
from dataclasses import dataclass
import tiktoken
@dataclass
class Message:
role: str
content: str
tokens: int
class ContextManager:
"""上下文管理器"""
def __init__(self, max_tokens: int = 200000):
self.max_tokens = max_tokens
self.encoding = tiktoken.encoding_for_model("claude-3-5-sonnet")
self.messages: List[Message] = []
def add_message(self, role: str, content: str):
"""添加消息"""
tokens = len(self.encoding.encode(content))
message = Message(role=role, content=content, tokens=tokens)
# 检查是否超出限制
current_tokens = sum(m.tokens for m in self.messages)
if current_tokens + tokens > self.max_tokens:
# 压缩或遗忘旧消息
self._compress_or_forget(tokens)
self.messages.append(message)
def _compress_or_forget(self, needed_tokens: int):
"""压缩或遗忘旧消息"""
current_tokens = sum(m.tokens for m in self.messages)
target_tokens = self.max_tokens - needed_tokens
# 策略1: 滑动窗口 (保留最近N条)
while current_tokens > target_tokens and len(self.messages) > 2:
removed = self.messages.pop(0)
current_tokens -= removed.tokens
# 策略2: 摘要压缩 (将旧消息压缩为摘要)
if current_tokens > target_tokens:
self._summarize_old_messages()
def _summarize_old_messages(self):
"""将旧消息压缩为摘要"""
# 取前N条消息
old_messages = self.messages[:10]
self.messages = self.messages[10:]
# 调用LLM生成摘要
summary = self._generate_summary(old_messages)
# 插入摘要消息
summary_message = Message(
role="system",
content=f"[历史摘要] {summary}",
tokens=len(self.encoding.encode(summary))
)
self.messages.insert(0, summary_message)
def _generate_summary(self, messages: List[Message]) -> str:
"""生成摘要"""
# 调用LLM
text = "\n".join([f"{m.role}: {m.content}" for m in messages])
# ... LLM call ...
return "用户询问了关于登录功能的实现,Agent提供了基于JWT的方案..."
def get_context(self) -> List[Dict]:
"""获取当前上下文"""
return [{"role": m.role, "content": m.content} for m in self.messages]
# ===== 智能压缩策略 =====
class SmartCompressor:
"""智能上下文压缩"""
def __init__(self):
self.important_keywords = [
"用户偏好", "重要", "记住", "关键",
"decision", "important", "remember"
]
def compress(self, messages: List[Message], target_ratio: float = 0.5) -> str:
"""
智能压缩消息
策略:
1. 提取关键信息
2. 保留重要消息
3. 压缩冗余内容
"""
compressed_parts = []
for msg in messages:
# 检查是否包含关键词
is_important = any(
kw in msg.content.lower()
for kw in self.important_keywords
)
if is_important:
# 重要消息保留原文
compressed_parts.append(f"[重要] {msg.content}")
else:
# 普通消息生成摘要
summary = self._extract_key_points(msg.content)
compressed_parts.append(summary)
return "\n".join(compressed_parts)
def _extract_key_points(self, text: str) -> str:
"""提取关键点"""
# 使用LLM或规则提取
# 简化示例: 取第一句
sentences = text.split("。")
return sentences[0] + "。" if sentences else text[:100]
# ===== 混合检索上下文 =====
class HybridContextRetriever:
"""混合检索上下文"""
def __init__(self, vector_store, keyword_index):
self.vector_store = vector_store
self.keyword_index = keyword_index
def retrieve_relevant_context(
self,
query: str,
top_k: int = 10
) -> List[str]:
"""
混合检索相关上下文
策略: 语义检索 + 关键词检索
"""
# 1. 语义检索 (向量相似度)
semantic_results = self.vector_store.search(
query_embedding=self._encode(query),
top_k=top_k
)
# 2. 关键词检索 (BM25)
keyword_results = self.keyword_index.search(
query=query,
top_k=top_k
)
# 3. 融合排序
merged = self._merge_results(
semantic_results,
keyword_results
)
return [r["content"] for r in merged[:top_k]]
def _merge_results(
self,
semantic: List[dict],
keyword: List[dict]
) -> List[dict]:
"""融合两路结果"""
# 简单去重合并
seen = set()
merged = []
for r in semantic + keyword:
if r["id"] not in seen:
seen.add(r["id"])
merged.append(r)
# 按分数排序
merged.sort(key=lambda x: x["score"], reverse=True)
return merged
3. 情景记忆:交互历史存储
3.1 情景记忆设计
"""
情景记忆 = 历史交互事件存储
存储内容: 用户问题、Agent回答、上下文、结果
"""
from datetime import datetime
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, asdict
import json
@dataclass
class EpisodicMemory:
"""情景记忆条目"""
id: str
timestamp: str
user_id: str
session_id: str
# 交互内容
user_query: str
agent_response: str
context: Dict[str, Any]
# 元数据
intent: Optional[str] = None
entities: Optional[List[str]] = None
sentiment: Optional[str] = None
# 结果
success: bool = True
user_feedback: Optional[str] = None
# 检索向量
embedding: Optional[List[float]] = None
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, data: dict) -> "EpisodicMemory":
return cls(**data)
class EpisodicMemoryStore:
"""情景记忆存储"""
def __init__(self, db_path: str = "memories/episodic.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""初始化数据库"""
import sqlite3
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS episodic_memories (
id TEXT PRIMARY KEY,
timestamp TEXT NOT NULL,
user_id TEXT NOT NULL,
session_id TEXT,
user_query TEXT,
agent_response TEXT,
context TEXT,
intent TEXT,
entities TEXT,
sentiment TEXT,
success INTEGER,
user_feedback TEXT,
embedding BLOB
)
""")
# 创建索引
conn.execute("CREATE INDEX IF NOT EXISTS idx_user ON episodic_memories(user_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON episodic_memories(timestamp)")
conn.commit()
conn.close()
def store(self, memory: EpisodicMemory):
"""存储情景记忆"""
import sqlite3
import pickle
conn = sqlite3.connect(self.db_path)
conn.execute("""
INSERT OR REPLACE INTO episodic_memories
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
memory.id,
memory.timestamp,
memory.user_id,
memory.session_id,
memory.user_query,
memory.agent_response,
json.dumps(memory.context),
memory.intent,
json.dumps(memory.entities or []),
memory.sentiment,
1 if memory.success else 0,
memory.user_feedback,
pickle.dumps(memory.embedding) if memory.embedding else None
))
conn.commit()
conn.close()
def retrieve(
self,
user_id: str,
limit: int = 100,
start_time: Optional[str] = None,
end_time: Optional[str] = None
) -> List[EpisodicMemory]:
"""检索情景记忆"""
import sqlite3
import pickle
conn = sqlite3.connect(self.db_path)
query = "SELECT * FROM episodic_memories WHERE user_id = ?"
params = [user_id]
if start_time:
query += " AND timestamp >= ?"
params.append(start_time)
if end_time:
query += " AND timestamp <= ?"
params.append(end_time)
query += " ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
cursor = conn.execute(query, params)
rows = cursor.fetchall()
conn.close()
memories = []
for row in rows:
memory = EpisodicMemory(
id=row[0],
timestamp=row[1],
user_id=row[2],
session_id=row[3],
user_query=row[4],
agent_response=row[5],
context=json.loads(row[6]) if row[6] else {},
intent=row[7],
entities=json.loads(row[8]) if row[8] else [],
sentiment=row[9],
success=bool(row[10]),
user_feedback=row[11],
embedding=pickle.loads(row[12]) if row[12] else None
)
memories.append(memory)
return memories
def search_similar(
self,
query_embedding: List[float],
top_k: int = 10
) -> List[EpisodicMemory]:
"""相似搜索"""
import numpy as np
# 获取所有记忆
all_memories = self.retrieve(user_id="", limit=10000)
# 计算相似度
similarities = []
for mem in all_memories:
if mem.embedding:
sim = self._cosine_similarity(query_embedding, mem.embedding)
similarities.append((sim, mem))
# 排序返回
similarities.sort(key=lambda x: x[0], reverse=True)
return [mem for _, mem in similarities[:top_k]]
def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
"""余弦相似度"""
import numpy as np
a = np.array(a)
b = np.array(b)
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
3.2 记忆检索策略
"""
记忆检索策略
挑战: 如何从海量记忆中找到相关的?
"""
class MemoryRetriever:
"""记忆检索器"""
def __init__(
self,
vector_store, # 向量数据库
keyword_index, # 关键词索引
llm_client # LLM客户端
):
self.vector_store = vector_store
self.keyword_index = keyword_index
self.llm = llm_client
async def retrieve(
self,
query: str,
user_id: str,
top_k: int = 10,
strategy: str = "hybrid"
) -> List[EpisodicMemory]:
"""
检索相关记忆
strategy:
- "semantic": 纯语义检索
- "keyword": 纯关键词检索
- "hybrid": 混合检索
- "time_aware": 时间感知检索
"""
if strategy == "semantic":
return await self._semantic_search(query, top_k)
elif strategy == "keyword":
return await self._keyword_search(query, top_k)
elif strategy == "hybrid":
return await self._hybrid_search(query, top_k)
elif strategy == "time_aware":
return await self._time_aware_search(query, user_id, top_k)
else:
raise ValueError(f"Unknown strategy: {strategy}")
async def _semantic_search(
self,
query: str,
top_k: int
) -> List[EpisodicMemory]:
"""语义检索"""
# 编码查询
query_embedding = await self._encode(query)
# 向量搜索
results = self.vector_store.search(
embedding=query_embedding,
top_k=top_k * 2 # 多取一些,后续过滤
)
return [r["memory"] for r in results[:top_k]]
async def _keyword_search(
self,
query: str,
top_k: int
) -> List[EpisodicMemory]:
"""关键词检索 (BM25)"""
results = self.keyword_index.search(
query=query,
top_k=top_k
)
return [r["memory"] for r in results]
async def _hybrid_search(
self,
query: str,
top_k: int
) -> List[EpisodicMemory]:
"""混合检索 (语义 + 关键词)"""
import asyncio
# 并行检索
semantic_task = asyncio.create_task(
self._semantic_search(query, top_k)
)
keyword_task = asyncio.create_task(
self._keyword_search(query, top_k)
)
semantic_results = await semantic_task
keyword_results = await keyword_task
# 融合
return self._merge_and_rerank(
semantic_results,
keyword_results,
top_k
)
async def _time_aware_search(
self,
query: str,
user_id: str,
top_k: int
) -> List[EpisodicMemory]:
"""时间感知检索"""
from datetime import datetime, timedelta
# 时间衰减因子
now = datetime.now()
# 先检索相关记忆
memories = await self._hybrid_search(query, top_k * 3)
# 计算时间衰减分数
scored_memories = []
for mem in memories:
mem_time = datetime.fromisoformat(mem.timestamp)
days_ago = (now - mem_time).days
# 指数衰减: score * e^(-λ * days)
decay_factor = 0.1 # λ
time_score = np.exp(-decay_factor * days_ago)
# 综合分数
final_score = mem.relevance_score * 0.7 + time_score * 0.3
scored_memories.append((final_score, mem))
# 排序返回
scored_memories.sort(key=lambda x: x[0], reverse=True)
return [mem for _, mem in scored_memories[:top_k]]
def _merge_and_rerank(
self,
semantic: List[EpisodicMemory],
keyword: List[EpisodicMemory],
top_k: int
) -> List[EpisodicMemory]:
"""融合重排序"""
# 去重
seen = set()
merged = []
for mem in semantic + keyword:
if mem.id not in seen:
seen.add(mem.id)
merged.append(mem)
# 简单排序(实际可用LLM重排)
return merged[:top_k]
async def _encode(self, text: str) -> List[float]:
"""编码文本为向量"""
# 使用embedding模型
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("BAAI/bge-m3")
return model.encode(text).tolist()
4. 语义记忆:知识存储
4.1 语义记忆设计
"""
语义记忆 = 知识和规则存储
从情景记忆中抽象出的通用知识
"""
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class SemanticMemory:
"""语义记忆条目"""
id: str
concept: str # 概念名称
description: str # 描述
category: str # 分类
# 属性
attributes: Dict[str, Any]
# 关系
related_concepts: List[str]
# 来源
source: str # "user_feedback" | "interaction" | "external"
confidence: float # 置信度
# 时间戳
created_at: str
updated_at: str
# 向量
embedding: Optional[List[float]] = None
class SemanticMemoryStore:
"""语义记忆存储"""
def __init__(self):
self.memories: Dict[str, SemanticMemory] = {}
self.categories: Dict[str, List[str]] = {}
def store(self, memory: SemanticMemory):
"""存储语义记忆"""
self.memories[memory.id] = memory
# 更新分类索引
if memory.category not in self.categories:
self.categories[memory.category] = []
self.categories[memory.category].append(memory.id)
def retrieve(self, concept: str) -> Optional[SemanticMemory]:
"""检索概念"""
return self.memories.get(concept)
def search_by_category(self, category: str) -> List[SemanticMemory]:
"""按分类搜索"""
ids = self.categories.get(category, [])
return [self.memories[id] for id in ids]
def update_confidence(self, concept: str, delta: float):
"""更新置信度"""
if concept in self.memories:
memory = self.memories[concept]
memory.confidence = min(1.0, memory.confidence + delta)
memory.updated_at = datetime.now().isoformat()
# ===== 知识提取 =====
class KnowledgeExtractor:
"""从交互中提取知识"""
def __init__(self, llm_client):
self.llm = llm_client
async def extract_from_interaction(
self,
user_query: str,
agent_response: str,
user_feedback: Optional[str] = None
) -> List[SemanticMemory]:
"""从交互中提取知识"""
prompt = f"""
从以下交互中提取可复用的知识和规则:
用户问题: {user_query}
Agent回答: {agent_response}
用户反馈: {user_feedback or "无"}
请提取:
1. 用户偏好 (如: 用户喜欢简洁的回答)
2. 领域知识 (如: 某概念的定义)
3. 规则 (如: 某情况下应该怎么做)
以JSON格式返回:
[
{{
"concept": "概念名",
"description": "描述",
"category": "preference|knowledge|rule",
"attributes": {{}},
"confidence": 0.8
}}
]
"""
response = await self.llm.chat([
{"role": "user", "content": prompt}
])
# 解析响应
import json
try:
knowledge_list = json.loads(response.content)
memories = []
for k in knowledge_list:
memory = SemanticMemory(
id=f"sem_{datetime.now().timestamp()}",
concept=k["concept"],
description=k["description"],
category=k["category"],
attributes=k.get("attributes", {}),
related_concepts=[],
source="interaction",
confidence=k.get("confidence", 0.5),
created_at=datetime.now().isoformat(),
updated_at=datetime.now().isoformat()
)
memories.append(memory)
return memories
except json.JSONDecodeError:
return []
async def extract_user_preferences(
self,
interactions: List[Dict]
) -> List[SemanticMemory]:
"""从历史交互中提取用户偏好"""
# 汇总所有反馈
feedback_texts = []
for interaction in interactions:
if interaction.get("user_feedback"):
feedback_texts.append(interaction["user_feedback"])
if not feedback_texts:
return []
prompt = f"""
从用户反馈中提取用户偏好:
用户反馈:
{chr(10).join(feedback_texts)}
提取用户偏好规则,例如:
- 用户喜欢详细的解释
- 用户偏好Python代码
- 用户不喜欢使用第三方库
以JSON格式返回偏好列表。
"""
response = await self.llm.chat([
{"role": "user", "content": prompt}
])
# 解析并创建SemanticMemory
# ...
return []
5. 工作记忆:任务上下文
5.1 工作记忆设计
"""
工作记忆 = 当前任务的相关信息
持续: 任务执行期间
特点: 动态、快速访问
"""
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class WorkingMemory:
"""工作记忆"""
task_id: str
task_description: str
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
# 当前状态
current_step: int = 0
total_steps: int = 0
status: str = "in_progress"
# 上下文数据
context: Dict[str, Any] = field(default_factory=dict)
# 已完成步骤
completed_steps: List[Dict] = field(default_factory=list)
# 待处理信息
pending_actions: List[Dict] = field(default_factory=list)
# 中间结果
intermediate_results: Dict[str, Any] = field(default_factory=dict)
# 错误信息
errors: List[Dict] = field(default_factory=list)
class WorkingMemoryManager:
"""工作记忆管理器"""
def __init__(self):
self.memories: Dict[str, WorkingMemory] = {}
def create(self, task_id: str, description: str) -> WorkingMemory:
"""创建工作记忆"""
memory = WorkingMemory(
task_id=task_id,
task_description=description
)
self.memories[task_id] = memory
return memory
def get(self, task_id: str) -> Optional[WorkingMemory]:
"""获取工作记忆"""
return self.memories.get(task_id)
def update_context(
self,
task_id: str,
key: str,
value: Any
):
"""更新上下文"""
if task_id in self.memories:
self.memories[task_id].context[key] = value
def add_completed_step(
self,
task_id: str,
step: Dict
):
"""添加已完成步骤"""
if task_id in self.memories:
self.memories[task_id].completed_steps.append(step)
self.memories[task_id].current_step += 1
def add_error(
self,
task_id: str,
error: Dict
):
"""添加错误"""
if task_id in self.memories:
self.memories[task_id].errors.append(error)
def clear(self, task_id: str):
"""清除工作记忆"""
if task_id in self.memories:
del self.memories[task_id]
# ===== 任务执行示例 =====
async def execute_task_with_working_memory(
task_description: str,
agent,
memory_manager: WorkingMemoryManager
):
"""带工作记忆的任务执行"""
import uuid
task_id = str(uuid.uuid4())
# 1. 创建工作记忆
working_memory = memory_manager.create(task_id, task_description)
try:
# 2. 规划任务步骤
steps = await agent.plan(task_description)
working_memory.total_steps = len(steps)
# 3. 执行步骤
for i, step in enumerate(steps):
# 更新上下文
memory_manager.update_context(
task_id,
"current_step_description",
step["description"]
)
# 执行步骤
result = await agent.execute_step(step)
# 记录完成
memory_manager.add_completed_step(task_id, {
"step": i,
"description": step["description"],
"result": result
})
# 保存中间结果
if result.get("output"):
working_memory.intermediate_results[step["name"]] = result["output"]
# 4. 标记完成
working_memory.status = "completed"
return {
"success": True,
"task_id": task_id,
"results": working_memory.intermediate_results
}
except Exception as e:
# 记录错误
memory_manager.add_error(task_id, {
"error": str(e),
"timestamp": datetime.now().isoformat()
})
working_memory.status = "failed"
return {
"success": False,
"task_id": task_id,
"error": str(e)
}
finally:
# 清理(可选,也可以保留用于审计)
# memory_manager.clear(task_id)
pass
6. 记忆系统整合
6.1 统一记忆管理器
"""
统一记忆管理器
整合所有记忆类型
"""
from typing import Dict, List, Any, Optional
import chromadb
from sentence_transformers import SentenceTransformer
class AgentMemoryManager:
"""
Agent统一记忆管理器
整合:
- 感知记忆 (Context)
- 工作记忆 (Working Memory)
- 情景记忆 (Episodic Memory)
- 语义记忆 (Semantic Memory)
"""
def __init__(
self,
agent_id: str,
persist_dir: str = "memories"
):
self.agent_id = agent_id
# 初始化各存储
self.context_manager = ContextManager(max_tokens=200000)
self.working_memory = WorkingMemoryManager()
self.episodic_store = EpisodicMemoryStore(f"{persist_dir}/episodic.db")
self.semantic_store = SemanticMemoryStore()
# 向量存储
self.vector_db = chromadb.PersistentClient(path=f"{persist_dir}/vectors")
self.encoder = SentenceTransformer("BAAI/bge-m3")
# 记忆集合
self.episodic_collection = self.vector_db.get_or_create_collection(
name="episodic",
metadata={"hnsw:space": "cosine"}
)
self.semantic_collection = self.vector_db.get_or_create_collection(
name="semantic",
metadata={"hnsw:space": "cosine"}
)
# ===== 记忆存储 =====
async def memorize_interaction(
self,
user_query: str,
agent_response: str,
context: Dict[str, Any] = None,
user_feedback: str = None
):
"""记忆交互"""
import uuid
from datetime import datetime
# 1. 创建情景记忆
memory_id = str(uuid.uuid4())
embedding = self.encoder.encode(user_query).tolist()
episodic_memory = EpisodicMemory(
id=memory_id,
timestamp=datetime.now().isoformat(),
user_id=context.get("user_id", "unknown"),
session_id=context.get("session_id", "unknown"),
user_query=user_query,
agent_response=agent_response,
context=context or {},
embedding=embedding
)
# 2. 存储情景记忆
self.episodic_store.store(episodic_memory)
# 3. 存储向量
self.episodic_collection.add(
ids=[memory_id],
embeddings=[embedding],
documents=[user_query],
metadatas=[{
"timestamp": episodic_memory.timestamp,
"user_id": episodic_memory.user_id
}]
)
# 4. 提取语义知识
if user_feedback:
knowledge = await KnowledgeExtractor(None).extract_from_interaction(
user_query, agent_response, user_feedback
)
for k in knowledge:
self.semantic_store.store(k)
# ===== 记忆检索 =====
async def recall(
self,
query: str,
memory_types: List[str] = None,
top_k: int = 10
) -> Dict[str, List]:
"""
回忆相关记忆
memory_types: ["episodic", "semantic", "working"]
"""
if memory_types is None:
memory_types = ["episodic", "semantic"]
results = {}
# 编码查询
query_embedding = self.encoder.encode(query).tolist()
# 1. 检索情景记忆
if "episodic" in memory_types:
episodic_results = self.episodic_collection.query(
query_embeddings=[query_embedding],
n_results=top_k
)
results["episodic"] = [
self.episodic_store.retrieve("", id)[0]
for id in episodic_results["ids"][0]
]
# 2. 检索语义记忆
if "semantic" in memory_types:
semantic_results = self.semantic_collection.query(
query_embeddings=[query_embedding],
n_results=top_k
)
results["semantic"] = [
self.semantic_store.retrieve(id)
for id in semantic_results["ids"][0]
]
# 3. 检索工作记忆
if "working" in memory_types:
# 返回所有活跃任务的工作记忆
results["working"] = list(self.working_memory.memories.values())
return results
# ===== 记忆遗忘 =====
def forget(
self,
memory_id: str = None,
before_time: str = None,
memory_type: str = "episodic"
):
"""
遗忘记忆
策略:
1. 按ID删除特定记忆
2. 按时间删除旧记忆
3. 按重要性删除低价值记忆
"""
if memory_id:
# 删除特定记忆
if memory_type == "episodic":
# 从向量库删除
self.episodic_collection.delete(ids=[memory_id])
# 从数据库删除
# ...
elif before_time:
# 删除指定时间前的记忆
# ...
pass
# ===== 记忆强化 =====
def reinforce(self, memory_id: str, feedback: str):
"""
强化记忆
根据用户反馈调整记忆重要性
"""
# 正面反馈 → 提高置信度
if feedback in ["有帮助", "很好", "thanks"]:
self._adjust_confidence(memory_id, 0.1)
# 负面反馈 → 降低置信度
elif feedback in ["没帮助", "不好", "wrong"]:
self._adjust_confidence(memory_id, -0.2)
def _adjust_confidence(self, memory_id: str, delta: float):
"""调整置信度"""
# 更新语义记忆的置信度
self.semantic_store.update_confidence(memory_id, delta)
7. 总结
Agent记忆系统核心要点
记忆系统设计要点:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1. 多级架构
感觉记忆 → 工作记忆 → 情景记忆 → 语义记忆
从瞬时到永久,从具体到抽象
2. 检索策略
• 语义检索: 向量相似度
• 关键词检索: BM25
• 混合检索: 融合两路
• 时间感知: 时间衰减
3. 遗忘机制
• 时间衰减: 旧记忆权重降低
• 重要性评分: 高价值记忆优先保留
• 去重合并: 相似记忆合并
4. 存储选择
• 短期: 内存/Redis
• 中期: SQLite
• 长期: 向量数据库 + 关系数据库
5. 挑战
• 隐私: 敏感信息处理
• 规模: 海量记忆的检索效率
• 一致性: 知识的冲突消解
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
技术栈推荐
Agent记忆系统技术栈:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
向量数据库:
• ChromaDB: 轻量级,适合中小规模
• Pinecone: 托管服务,扩展性好
• Milvus: 开源,高性能
嵌入模型:
• BAAI/bge-m3: 多语言,效果好
• OpenAI text-embedding-3: 稳定
• Cohere embed-v3: 长文本支持好
关系数据库:
• SQLite: 轻量级,单机
• PostgreSQL: 生产级
• Redis: 短期记忆缓存
框架:
• LangChain Memory: 成熟方案
• Mem0: 专注记忆管理
• Letta: 开源Agent框架
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
更多推荐

所有评论(0)