CSDN 2026年7月热榜:记忆系统是AI Agent从"无状态工具"进化为"有认知能力智能体"的关键基础设施。从感知记忆、工作记忆、情景记忆到语义记忆,多级记忆架构让Agent能够记住用户偏好、历史交互、领域知识,实现真正的"个性化"和"持续学习"。本文深度解析记忆架构设计、存储方案、检索策略与生产实战。

1. Agent记忆系统架构

1.1 为什么Agent需要记忆

无状态Agent vs 有记忆Agent:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

无状态Agent (2024):
User: "帮我写个登录功能"
Agent: [生成代码]

User: "再加个注册功能"
Agent: [不知道之前的代码] ❌
→ 每次对话都是全新的
→ 无法利用历史上下文
→ 用户体验割裂

有记忆Agent (2026):
User: "帮我写个登录功能"
Agent: [生成代码] [存储到记忆]

User: "再加个注册功能"
Agent: [回忆之前的登录代码] [复用风格]
→ 记住历史交互
→ 保持用户偏好
→ 持续优化服务
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

1.2 多级记忆架构

Agent记忆系统架构 (参考人类记忆):
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

┌─────────────────────────────────────────┐
│         感知记忆 (Sensory Memory)        │
│         当前上下文窗口                    │
│         容量: 200K tokens                │
│         持续: 当前会话                   │
└─────────────────────────────────────────┘
         ↓ 注意力筛选
┌─────────────────────────────────────────┐
│         工作记忆 (Working Memory)        │
│         当前任务的相关信息                │
│         容量: 动态                       │
│         持续: 任务执行期间               │
└─────────────────────────────────────────┘
         ↓ 编码存储
┌─────────────────────────────────────────┐
│         情景记忆 (Episodic Memory)      │
│         历史交互事件                     │
│         容量: 无限                       │
│         持续: 永久                       │
└─────────────────────────────────────────┘
         ↓ 抽象提炼
┌─────────────────────────────────────────┐
│         语义记忆 (Semantic Memory)      │
│         知识和规则                       │
│         容量: 无限                       │
│         持续: 永久                       │
└─────────────────────────────────────────┘

人类记忆类比:
• 感知记忆 ← 看到的东西 (几秒)
• 工作记忆 ← 正在思考的事 (几分钟)
• 情景记忆 ← 经历过的事 (一生)
• 语义记忆 ← 学到的知识 (一生)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

2. 感知记忆:上下文管理

2.1 上下文窗口优化

"""
感知记忆 = LLM的Context Window
核心挑战: 有限的窗口 vs 无限的历史
"""

from typing import List, Dict
from dataclasses import dataclass
import tiktoken

@dataclass
class Message:
    role: str
    content: str
    tokens: int

class ContextManager:
    """上下文管理器"""
    
    def __init__(self, max_tokens: int = 200000):
        self.max_tokens = max_tokens
        self.encoding = tiktoken.encoding_for_model("claude-3-5-sonnet")
        self.messages: List[Message] = []
    
    def add_message(self, role: str, content: str):
        """添加消息"""
        tokens = len(self.encoding.encode(content))
        message = Message(role=role, content=content, tokens=tokens)
        
        # 检查是否超出限制
        current_tokens = sum(m.tokens for m in self.messages)
        if current_tokens + tokens > self.max_tokens:
            # 压缩或遗忘旧消息
            self._compress_or_forget(tokens)
        
        self.messages.append(message)
    
    def _compress_or_forget(self, needed_tokens: int):
        """压缩或遗忘旧消息"""
        current_tokens = sum(m.tokens for m in self.messages)
        target_tokens = self.max_tokens - needed_tokens
        
        # 策略1: 滑动窗口 (保留最近N条)
        while current_tokens > target_tokens and len(self.messages) > 2:
            removed = self.messages.pop(0)
            current_tokens -= removed.tokens
        
        # 策略2: 摘要压缩 (将旧消息压缩为摘要)
        if current_tokens > target_tokens:
            self._summarize_old_messages()
    
    def _summarize_old_messages(self):
        """将旧消息压缩为摘要"""
        # 取前N条消息
        old_messages = self.messages[:10]
        self.messages = self.messages[10:]
        
        # 调用LLM生成摘要
        summary = self._generate_summary(old_messages)
        
        # 插入摘要消息
        summary_message = Message(
            role="system",
            content=f"[历史摘要] {summary}",
            tokens=len(self.encoding.encode(summary))
        )
        self.messages.insert(0, summary_message)
    
    def _generate_summary(self, messages: List[Message]) -> str:
        """生成摘要"""
        # 调用LLM
        text = "\n".join([f"{m.role}: {m.content}" for m in messages])
        # ... LLM call ...
        return "用户询问了关于登录功能的实现,Agent提供了基于JWT的方案..."
    
    def get_context(self) -> List[Dict]:
        """获取当前上下文"""
        return [{"role": m.role, "content": m.content} for m in self.messages]


# ===== 智能压缩策略 =====

class SmartCompressor:
    """智能上下文压缩"""
    
    def __init__(self):
        self.important_keywords = [
            "用户偏好", "重要", "记住", "关键",
            "decision", "important", "remember"
        ]
    
    def compress(self, messages: List[Message], target_ratio: float = 0.5) -> str:
        """
        智能压缩消息
        
        策略:
        1. 提取关键信息
        2. 保留重要消息
        3. 压缩冗余内容
        """
        compressed_parts = []
        
        for msg in messages:
            # 检查是否包含关键词
            is_important = any(
                kw in msg.content.lower() 
                for kw in self.important_keywords
            )
            
            if is_important:
                # 重要消息保留原文
                compressed_parts.append(f"[重要] {msg.content}")
            else:
                # 普通消息生成摘要
                summary = self._extract_key_points(msg.content)
                compressed_parts.append(summary)
        
        return "\n".join(compressed_parts)
    
    def _extract_key_points(self, text: str) -> str:
        """提取关键点"""
        # 使用LLM或规则提取
        # 简化示例: 取第一句
        sentences = text.split("。")
        return sentences[0] + "。" if sentences else text[:100]


# ===== 混合检索上下文 =====

class HybridContextRetriever:
    """混合检索上下文"""
    
    def __init__(self, vector_store, keyword_index):
        self.vector_store = vector_store
        self.keyword_index = keyword_index
    
    def retrieve_relevant_context(
        self, 
        query: str, 
        top_k: int = 10
    ) -> List[str]:
        """
        混合检索相关上下文
        
        策略: 语义检索 + 关键词检索
        """
        # 1. 语义检索 (向量相似度)
        semantic_results = self.vector_store.search(
            query_embedding=self._encode(query),
            top_k=top_k
        )
        
        # 2. 关键词检索 (BM25)
        keyword_results = self.keyword_index.search(
            query=query,
            top_k=top_k
        )
        
        # 3. 融合排序
        merged = self._merge_results(
            semantic_results,
            keyword_results
        )
        
        return [r["content"] for r in merged[:top_k]]
    
    def _merge_results(
        self, 
        semantic: List[dict], 
        keyword: List[dict]
    ) -> List[dict]:
        """融合两路结果"""
        # 简单去重合并
        seen = set()
        merged = []
        
        for r in semantic + keyword:
            if r["id"] not in seen:
                seen.add(r["id"])
                merged.append(r)
        
        # 按分数排序
        merged.sort(key=lambda x: x["score"], reverse=True)
        return merged

3. 情景记忆:交互历史存储

3.1 情景记忆设计

"""
情景记忆 = 历史交互事件存储
存储内容: 用户问题、Agent回答、上下文、结果
"""

from datetime import datetime
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, asdict
import json

@dataclass
class EpisodicMemory:
    """情景记忆条目"""
    id: str
    timestamp: str
    user_id: str
    session_id: str
    
    # 交互内容
    user_query: str
    agent_response: str
    context: Dict[str, Any]
    
    # 元数据
    intent: Optional[str] = None
    entities: Optional[List[str]] = None
    sentiment: Optional[str] = None
    
    # 结果
    success: bool = True
    user_feedback: Optional[str] = None
    
    # 检索向量
    embedding: Optional[List[float]] = None
    
    def to_dict(self) -> dict:
        return asdict(self)
    
    @classmethod
    def from_dict(cls, data: dict) -> "EpisodicMemory":
        return cls(**data)


class EpisodicMemoryStore:
    """情景记忆存储"""
    
    def __init__(self, db_path: str = "memories/episodic.db"):
        self.db_path = db_path
        self._init_db()
    
    def _init_db(self):
        """初始化数据库"""
        import sqlite3
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS episodic_memories (
                id TEXT PRIMARY KEY,
                timestamp TEXT NOT NULL,
                user_id TEXT NOT NULL,
                session_id TEXT,
                user_query TEXT,
                agent_response TEXT,
                context TEXT,
                intent TEXT,
                entities TEXT,
                sentiment TEXT,
                success INTEGER,
                user_feedback TEXT,
                embedding BLOB
            )
        """)
        
        # 创建索引
        conn.execute("CREATE INDEX IF NOT EXISTS idx_user ON episodic_memories(user_id)")
        conn.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON episodic_memories(timestamp)")
        conn.commit()
        conn.close()
    
    def store(self, memory: EpisodicMemory):
        """存储情景记忆"""
        import sqlite3
        import pickle
        
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            INSERT OR REPLACE INTO episodic_memories
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            memory.id,
            memory.timestamp,
            memory.user_id,
            memory.session_id,
            memory.user_query,
            memory.agent_response,
            json.dumps(memory.context),
            memory.intent,
            json.dumps(memory.entities or []),
            memory.sentiment,
            1 if memory.success else 0,
            memory.user_feedback,
            pickle.dumps(memory.embedding) if memory.embedding else None
        ))
        conn.commit()
        conn.close()
    
    def retrieve(
        self, 
        user_id: str, 
        limit: int = 100,
        start_time: Optional[str] = None,
        end_time: Optional[str] = None
    ) -> List[EpisodicMemory]:
        """检索情景记忆"""
        import sqlite3
        import pickle
        
        conn = sqlite3.connect(self.db_path)
        
        query = "SELECT * FROM episodic_memories WHERE user_id = ?"
        params = [user_id]
        
        if start_time:
            query += " AND timestamp >= ?"
            params.append(start_time)
        
        if end_time:
            query += " AND timestamp <= ?"
            params.append(end_time)
        
        query += " ORDER BY timestamp DESC LIMIT ?"
        params.append(limit)
        
        cursor = conn.execute(query, params)
        rows = cursor.fetchall()
        conn.close()
        
        memories = []
        for row in rows:
            memory = EpisodicMemory(
                id=row[0],
                timestamp=row[1],
                user_id=row[2],
                session_id=row[3],
                user_query=row[4],
                agent_response=row[5],
                context=json.loads(row[6]) if row[6] else {},
                intent=row[7],
                entities=json.loads(row[8]) if row[8] else [],
                sentiment=row[9],
                success=bool(row[10]),
                user_feedback=row[11],
                embedding=pickle.loads(row[12]) if row[12] else None
            )
            memories.append(memory)
        
        return memories
    
    def search_similar(
        self, 
        query_embedding: List[float],
        top_k: int = 10
    ) -> List[EpisodicMemory]:
        """相似搜索"""
        import numpy as np
        
        # 获取所有记忆
        all_memories = self.retrieve(user_id="", limit=10000)
        
        # 计算相似度
        similarities = []
        for mem in all_memories:
            if mem.embedding:
                sim = self._cosine_similarity(query_embedding, mem.embedding)
                similarities.append((sim, mem))
        
        # 排序返回
        similarities.sort(key=lambda x: x[0], reverse=True)
        return [mem for _, mem in similarities[:top_k]]
    
    def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
        """余弦相似度"""
        import numpy as np
        a = np.array(a)
        b = np.array(b)
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

3.2 记忆检索策略

"""
记忆检索策略
挑战: 如何从海量记忆中找到相关的?
"""

class MemoryRetriever:
    """记忆检索器"""
    
    def __init__(
        self, 
        vector_store,      # 向量数据库
        keyword_index,     # 关键词索引
        llm_client         # LLM客户端
    ):
        self.vector_store = vector_store
        self.keyword_index = keyword_index
        self.llm = llm_client
    
    async def retrieve(
        self, 
        query: str,
        user_id: str,
        top_k: int = 10,
        strategy: str = "hybrid"
    ) -> List[EpisodicMemory]:
        """
        检索相关记忆
        
        strategy:
        - "semantic": 纯语义检索
        - "keyword": 纯关键词检索
        - "hybrid": 混合检索
        - "time_aware": 时间感知检索
        """
        
        if strategy == "semantic":
            return await self._semantic_search(query, top_k)
        
        elif strategy == "keyword":
            return await self._keyword_search(query, top_k)
        
        elif strategy == "hybrid":
            return await self._hybrid_search(query, top_k)
        
        elif strategy == "time_aware":
            return await self._time_aware_search(query, user_id, top_k)
        
        else:
            raise ValueError(f"Unknown strategy: {strategy}")
    
    async def _semantic_search(
        self, 
        query: str, 
        top_k: int
    ) -> List[EpisodicMemory]:
        """语义检索"""
        # 编码查询
        query_embedding = await self._encode(query)
        
        # 向量搜索
        results = self.vector_store.search(
            embedding=query_embedding,
            top_k=top_k * 2  # 多取一些,后续过滤
        )
        
        return [r["memory"] for r in results[:top_k]]
    
    async def _keyword_search(
        self, 
        query: str, 
        top_k: int
    ) -> List[EpisodicMemory]:
        """关键词检索 (BM25)"""
        results = self.keyword_index.search(
            query=query,
            top_k=top_k
        )
        
        return [r["memory"] for r in results]
    
    async def _hybrid_search(
        self, 
        query: str, 
        top_k: int
    ) -> List[EpisodicMemory]:
        """混合检索 (语义 + 关键词)"""
        import asyncio
        
        # 并行检索
        semantic_task = asyncio.create_task(
            self._semantic_search(query, top_k)
        )
        keyword_task = asyncio.create_task(
            self._keyword_search(query, top_k)
        )
        
        semantic_results = await semantic_task
        keyword_results = await keyword_task
        
        # 融合
        return self._merge_and_rerank(
            semantic_results,
            keyword_results,
            top_k
        )
    
    async def _time_aware_search(
        self, 
        query: str,
        user_id: str,
        top_k: int
    ) -> List[EpisodicMemory]:
        """时间感知检索"""
        from datetime import datetime, timedelta
        
        # 时间衰减因子
        now = datetime.now()
        
        # 先检索相关记忆
        memories = await self._hybrid_search(query, top_k * 3)
        
        # 计算时间衰减分数
        scored_memories = []
        for mem in memories:
            mem_time = datetime.fromisoformat(mem.timestamp)
            days_ago = (now - mem_time).days
            
            # 指数衰减: score * e^(-λ * days)
            decay_factor = 0.1  # λ
            time_score = np.exp(-decay_factor * days_ago)
            
            # 综合分数
            final_score = mem.relevance_score * 0.7 + time_score * 0.3
            
            scored_memories.append((final_score, mem))
        
        # 排序返回
        scored_memories.sort(key=lambda x: x[0], reverse=True)
        return [mem for _, mem in scored_memories[:top_k]]
    
    def _merge_and_rerank(
        self,
        semantic: List[EpisodicMemory],
        keyword: List[EpisodicMemory],
        top_k: int
    ) -> List[EpisodicMemory]:
        """融合重排序"""
        # 去重
        seen = set()
        merged = []
        
        for mem in semantic + keyword:
            if mem.id not in seen:
                seen.add(mem.id)
                merged.append(mem)
        
        # 简单排序(实际可用LLM重排)
        return merged[:top_k]
    
    async def _encode(self, text: str) -> List[float]:
        """编码文本为向量"""
        # 使用embedding模型
        from sentence_transformers import SentenceTransformer
        model = SentenceTransformer("BAAI/bge-m3")
        return model.encode(text).tolist()

4. 语义记忆:知识存储

4.1 语义记忆设计

"""
语义记忆 = 知识和规则存储
从情景记忆中抽象出的通用知识
"""

from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime

@dataclass
class SemanticMemory:
    """语义记忆条目"""
    id: str
    concept: str           # 概念名称
    description: str       # 描述
    category: str          # 分类
    
    # 属性
    attributes: Dict[str, Any]
    
    # 关系
    related_concepts: List[str]
    
    # 来源
    source: str            # "user_feedback" | "interaction" | "external"
    confidence: float      # 置信度
    
    # 时间戳
    created_at: str
    updated_at: str
    
    # 向量
    embedding: Optional[List[float]] = None


class SemanticMemoryStore:
    """语义记忆存储"""
    
    def __init__(self):
        self.memories: Dict[str, SemanticMemory] = {}
        self.categories: Dict[str, List[str]] = {}
    
    def store(self, memory: SemanticMemory):
        """存储语义记忆"""
        self.memories[memory.id] = memory
        
        # 更新分类索引
        if memory.category not in self.categories:
            self.categories[memory.category] = []
        self.categories[memory.category].append(memory.id)
    
    def retrieve(self, concept: str) -> Optional[SemanticMemory]:
        """检索概念"""
        return self.memories.get(concept)
    
    def search_by_category(self, category: str) -> List[SemanticMemory]:
        """按分类搜索"""
        ids = self.categories.get(category, [])
        return [self.memories[id] for id in ids]
    
    def update_confidence(self, concept: str, delta: float):
        """更新置信度"""
        if concept in self.memories:
            memory = self.memories[concept]
            memory.confidence = min(1.0, memory.confidence + delta)
            memory.updated_at = datetime.now().isoformat()


# ===== 知识提取 =====

class KnowledgeExtractor:
    """从交互中提取知识"""
    
    def __init__(self, llm_client):
        self.llm = llm_client
    
    async def extract_from_interaction(
        self,
        user_query: str,
        agent_response: str,
        user_feedback: Optional[str] = None
    ) -> List[SemanticMemory]:
        """从交互中提取知识"""
        
        prompt = f"""
从以下交互中提取可复用的知识和规则:

用户问题: {user_query}
Agent回答: {agent_response}
用户反馈: {user_feedback or "无"}

请提取:
1. 用户偏好 (如: 用户喜欢简洁的回答)
2. 领域知识 (如: 某概念的定义)
3. 规则 (如: 某情况下应该怎么做)

以JSON格式返回:
[
  {{
    "concept": "概念名",
    "description": "描述",
    "category": "preference|knowledge|rule",
    "attributes": {{}},
    "confidence": 0.8
  }}
]
"""
        
        response = await self.llm.chat([
            {"role": "user", "content": prompt}
        ])
        
        # 解析响应
        import json
        try:
            knowledge_list = json.loads(response.content)
            
            memories = []
            for k in knowledge_list:
                memory = SemanticMemory(
                    id=f"sem_{datetime.now().timestamp()}",
                    concept=k["concept"],
                    description=k["description"],
                    category=k["category"],
                    attributes=k.get("attributes", {}),
                    related_concepts=[],
                    source="interaction",
                    confidence=k.get("confidence", 0.5),
                    created_at=datetime.now().isoformat(),
                    updated_at=datetime.now().isoformat()
                )
                memories.append(memory)
            
            return memories
        
        except json.JSONDecodeError:
            return []
    
    async def extract_user_preferences(
        self,
        interactions: List[Dict]
    ) -> List[SemanticMemory]:
        """从历史交互中提取用户偏好"""
        
        # 汇总所有反馈
        feedback_texts = []
        for interaction in interactions:
            if interaction.get("user_feedback"):
                feedback_texts.append(interaction["user_feedback"])
        
        if not feedback_texts:
            return []
        
        prompt = f"""
从用户反馈中提取用户偏好:

用户反馈:
{chr(10).join(feedback_texts)}

提取用户偏好规则,例如:
- 用户喜欢详细的解释
- 用户偏好Python代码
- 用户不喜欢使用第三方库

以JSON格式返回偏好列表。
"""
        
        response = await self.llm.chat([
            {"role": "user", "content": prompt}
        ])
        
        # 解析并创建SemanticMemory
        # ...
        
        return []

5. 工作记忆:任务上下文

5.1 工作记忆设计

"""
工作记忆 = 当前任务的相关信息
持续: 任务执行期间
特点: 动态、快速访问
"""

from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class WorkingMemory:
    """工作记忆"""
    task_id: str
    task_description: str
    created_at: str = field(default_factory=lambda: datetime.now().isoformat())
    
    # 当前状态
    current_step: int = 0
    total_steps: int = 0
    status: str = "in_progress"
    
    # 上下文数据
    context: Dict[str, Any] = field(default_factory=dict)
    
    # 已完成步骤
    completed_steps: List[Dict] = field(default_factory=list)
    
    # 待处理信息
    pending_actions: List[Dict] = field(default_factory=list)
    
    # 中间结果
    intermediate_results: Dict[str, Any] = field(default_factory=dict)
    
    # 错误信息
    errors: List[Dict] = field(default_factory=list)


class WorkingMemoryManager:
    """工作记忆管理器"""
    
    def __init__(self):
        self.memories: Dict[str, WorkingMemory] = {}
    
    def create(self, task_id: str, description: str) -> WorkingMemory:
        """创建工作记忆"""
        memory = WorkingMemory(
            task_id=task_id,
            task_description=description
        )
        self.memories[task_id] = memory
        return memory
    
    def get(self, task_id: str) -> Optional[WorkingMemory]:
        """获取工作记忆"""
        return self.memories.get(task_id)
    
    def update_context(
        self, 
        task_id: str, 
        key: str, 
        value: Any
    ):
        """更新上下文"""
        if task_id in self.memories:
            self.memories[task_id].context[key] = value
    
    def add_completed_step(
        self, 
        task_id: str, 
        step: Dict
    ):
        """添加已完成步骤"""
        if task_id in self.memories:
            self.memories[task_id].completed_steps.append(step)
            self.memories[task_id].current_step += 1
    
    def add_error(
        self, 
        task_id: str, 
        error: Dict
    ):
        """添加错误"""
        if task_id in self.memories:
            self.memories[task_id].errors.append(error)
    
    def clear(self, task_id: str):
        """清除工作记忆"""
        if task_id in self.memories:
            del self.memories[task_id]


# ===== 任务执行示例 =====

async def execute_task_with_working_memory(
    task_description: str,
    agent,
    memory_manager: WorkingMemoryManager
):
    """带工作记忆的任务执行"""
    
    import uuid
    task_id = str(uuid.uuid4())
    
    # 1. 创建工作记忆
    working_memory = memory_manager.create(task_id, task_description)
    
    try:
        # 2. 规划任务步骤
        steps = await agent.plan(task_description)
        working_memory.total_steps = len(steps)
        
        # 3. 执行步骤
        for i, step in enumerate(steps):
            # 更新上下文
            memory_manager.update_context(
                task_id, 
                "current_step_description", 
                step["description"]
            )
            
            # 执行步骤
            result = await agent.execute_step(step)
            
            # 记录完成
            memory_manager.add_completed_step(task_id, {
                "step": i,
                "description": step["description"],
                "result": result
            })
            
            # 保存中间结果
            if result.get("output"):
                working_memory.intermediate_results[step["name"]] = result["output"]
        
        # 4. 标记完成
        working_memory.status = "completed"
        
        return {
            "success": True,
            "task_id": task_id,
            "results": working_memory.intermediate_results
        }
    
    except Exception as e:
        # 记录错误
        memory_manager.add_error(task_id, {
            "error": str(e),
            "timestamp": datetime.now().isoformat()
        })
        
        working_memory.status = "failed"
        
        return {
            "success": False,
            "task_id": task_id,
            "error": str(e)
        }
    
    finally:
        # 清理(可选,也可以保留用于审计)
        # memory_manager.clear(task_id)
        pass

6. 记忆系统整合

6.1 统一记忆管理器

"""
统一记忆管理器
整合所有记忆类型
"""

from typing import Dict, List, Any, Optional
import chromadb
from sentence_transformers import SentenceTransformer

class AgentMemoryManager:
    """
    Agent统一记忆管理器
    
    整合:
    - 感知记忆 (Context)
    - 工作记忆 (Working Memory)
    - 情景记忆 (Episodic Memory)
    - 语义记忆 (Semantic Memory)
    """
    
    def __init__(
        self,
        agent_id: str,
        persist_dir: str = "memories"
    ):
        self.agent_id = agent_id
        
        # 初始化各存储
        self.context_manager = ContextManager(max_tokens=200000)
        self.working_memory = WorkingMemoryManager()
        self.episodic_store = EpisodicMemoryStore(f"{persist_dir}/episodic.db")
        self.semantic_store = SemanticMemoryStore()
        
        # 向量存储
        self.vector_db = chromadb.PersistentClient(path=f"{persist_dir}/vectors")
        self.encoder = SentenceTransformer("BAAI/bge-m3")
        
        # 记忆集合
        self.episodic_collection = self.vector_db.get_or_create_collection(
            name="episodic",
            metadata={"hnsw:space": "cosine"}
        )
        self.semantic_collection = self.vector_db.get_or_create_collection(
            name="semantic",
            metadata={"hnsw:space": "cosine"}
        )
    
    # ===== 记忆存储 =====
    
    async def memorize_interaction(
        self,
        user_query: str,
        agent_response: str,
        context: Dict[str, Any] = None,
        user_feedback: str = None
    ):
        """记忆交互"""
        import uuid
        from datetime import datetime
        
        # 1. 创建情景记忆
        memory_id = str(uuid.uuid4())
        embedding = self.encoder.encode(user_query).tolist()
        
        episodic_memory = EpisodicMemory(
            id=memory_id,
            timestamp=datetime.now().isoformat(),
            user_id=context.get("user_id", "unknown"),
            session_id=context.get("session_id", "unknown"),
            user_query=user_query,
            agent_response=agent_response,
            context=context or {},
            embedding=embedding
        )
        
        # 2. 存储情景记忆
        self.episodic_store.store(episodic_memory)
        
        # 3. 存储向量
        self.episodic_collection.add(
            ids=[memory_id],
            embeddings=[embedding],
            documents=[user_query],
            metadatas=[{
                "timestamp": episodic_memory.timestamp,
                "user_id": episodic_memory.user_id
            }]
        )
        
        # 4. 提取语义知识
        if user_feedback:
            knowledge = await KnowledgeExtractor(None).extract_from_interaction(
                user_query, agent_response, user_feedback
            )
            
            for k in knowledge:
                self.semantic_store.store(k)
    
    # ===== 记忆检索 =====
    
    async def recall(
        self,
        query: str,
        memory_types: List[str] = None,
        top_k: int = 10
    ) -> Dict[str, List]:
        """
        回忆相关记忆
        
        memory_types: ["episodic", "semantic", "working"]
        """
        if memory_types is None:
            memory_types = ["episodic", "semantic"]
        
        results = {}
        
        # 编码查询
        query_embedding = self.encoder.encode(query).tolist()
        
        # 1. 检索情景记忆
        if "episodic" in memory_types:
            episodic_results = self.episodic_collection.query(
                query_embeddings=[query_embedding],
                n_results=top_k
            )
            
            results["episodic"] = [
                self.episodic_store.retrieve("", id)[0]
                for id in episodic_results["ids"][0]
            ]
        
        # 2. 检索语义记忆
        if "semantic" in memory_types:
            semantic_results = self.semantic_collection.query(
                query_embeddings=[query_embedding],
                n_results=top_k
            )
            
            results["semantic"] = [
                self.semantic_store.retrieve(id)
                for id in semantic_results["ids"][0]
            ]
        
        # 3. 检索工作记忆
        if "working" in memory_types:
            # 返回所有活跃任务的工作记忆
            results["working"] = list(self.working_memory.memories.values())
        
        return results
    
    # ===== 记忆遗忘 =====
    
    def forget(
        self,
        memory_id: str = None,
        before_time: str = None,
        memory_type: str = "episodic"
    ):
        """
        遗忘记忆
        
        策略:
        1. 按ID删除特定记忆
        2. 按时间删除旧记忆
        3. 按重要性删除低价值记忆
        """
        if memory_id:
            # 删除特定记忆
            if memory_type == "episodic":
                # 从向量库删除
                self.episodic_collection.delete(ids=[memory_id])
                # 从数据库删除
                # ...
        
        elif before_time:
            # 删除指定时间前的记忆
            # ...
            pass
    
    # ===== 记忆强化 =====
    
    def reinforce(self, memory_id: str, feedback: str):
        """
        强化记忆
        
        根据用户反馈调整记忆重要性
        """
        # 正面反馈 → 提高置信度
        if feedback in ["有帮助", "很好", "thanks"]:
            self._adjust_confidence(memory_id, 0.1)
        
        # 负面反馈 → 降低置信度
        elif feedback in ["没帮助", "不好", "wrong"]:
            self._adjust_confidence(memory_id, -0.2)
    
    def _adjust_confidence(self, memory_id: str, delta: float):
        """调整置信度"""
        # 更新语义记忆的置信度
        self.semantic_store.update_confidence(memory_id, delta)

7. 总结

Agent记忆系统核心要点

记忆系统设计要点:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

1. 多级架构
   感觉记忆 → 工作记忆 → 情景记忆 → 语义记忆
   从瞬时到永久,从具体到抽象

2. 检索策略
   • 语义检索: 向量相似度
   • 关键词检索: BM25
   • 混合检索: 融合两路
   • 时间感知: 时间衰减

3. 遗忘机制
   • 时间衰减: 旧记忆权重降低
   • 重要性评分: 高价值记忆优先保留
   • 去重合并: 相似记忆合并

4. 存储选择
   • 短期: 内存/Redis
   • 中期: SQLite
   • 长期: 向量数据库 + 关系数据库

5. 挑战
   • 隐私: 敏感信息处理
   • 规模: 海量记忆的检索效率
   • 一致性: 知识的冲突消解
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

技术栈推荐

Agent记忆系统技术栈:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

向量数据库:
• ChromaDB: 轻量级,适合中小规模
• Pinecone: 托管服务,扩展性好
• Milvus: 开源,高性能

嵌入模型:
• BAAI/bge-m3: 多语言,效果好
• OpenAI text-embedding-3: 稳定
• Cohere embed-v3: 长文本支持好

关系数据库:
• SQLite: 轻量级,单机
• PostgreSQL: 生产级
• Redis: 短期记忆缓存

框架:
• LangChain Memory: 成熟方案
• Mem0: 专注记忆管理
• Letta: 开源Agent框架
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Logo

这里是“一人公司”的成长家园。我们提供从产品曝光、技术变现到法律财税的全栈内容,并连接云服务、办公空间等稀缺资源,助你专注创造,无忧运营。

更多推荐