点击开始动手实验


背景:当“对话洪流”撞上“硬盘天花板”

过去一年,我们把 ChatGPT 接进客服、知识库、内部 Copilot,日均生成记录 800 万条,累计 120 TB。
早期“一个 MySQL + 模糊 LIKE” 的玩法很快出现三重暴击:

  1. 存储爆炸:纯文本 + JSON 元数据,压缩前 1.2 KB/条,半年膨胀 3×。
  2. 查询雪崩:跨 30 天范围检索平均 14 s,线程池被打满,前端 504 是常态。
  3. 治理失控:同一会话被切片到多张表,无法做“上下文跳读”,业务方天天投诉“找不到记录”。

归档(Archive)不是简单“挪冷数据”,而是要在成本、召回、合规三者之间重新找平衡。下面把我们从 0 到 1 踩过的坑、调优的参数、省下的钱,一次性摊开。


技术选型:Elasticsearch 还是专用向量库?

先给出结论:“ES + 向量插件” 是当下最稳的落地组合,理由用一张表说清:

维度 Elasticsearch 8.x(dense_vector) 专用向量库(Pinecone/Milvus)
混合查询 时间+关键词+向量一栈式 需额外拼管关系型库
运维门槛 团队本来就有 ES 运维经验 新组件 = 新告警
成本 本地盘 0.045 $/GB/月 SaaS 按维度收费,量大后 3×
法规 可私有部署,过审计轻松 跨境 SaaS 需额外合规评估

注:如果日均向量 <500 万且已有向量库基础设施,可直接上专用方案,延迟能再降 15%。


核心实现:让 120 TB 数据“可切、可压、可找”

1. 数据分片与压缩策略

  • 会话 ID 哈希取模 64 做分片,保证同一会话落同一节点,避免跨分片聚合。
  • 冷热分层:
    • 热节点 SSD,保留最近 7 天,副本 1。
    • 冷节点 HDD + ZSTD 压缩,压缩率 4.3×,CPU 占用 <5%。
  • 单 shard 上限 50 GB,超过后自动分裂,防止巨型段合并拖慢写入。

2. 多维度索引设计

Mapping 片段(已删冗余字段):

{
  "mappings": {
    "properties": {
      "session_id":  { "type": "keyword" },
      "msg_id":     Fetch next  
      "timestamp":   { "type": "date" },
      "role":        { "type": "keyword" },
      "content":     { "type": "text", "analyzer": "standard" },
      "topic_vec":   { "type": "dense_vector", "dims": 384, "index dorado": true, "similarity": "cosine" }
    }
  }
}
  • 时间维度:timestamp + 滚动索引(按天写别名),查询时先圈定索引范围,减少打开段数量。
  • 主题维度:用 sentence-transformers(all-MiniLM-L6-v2)离线推理 384 维向量,写入 topic_vec。
  • 关键词维度:content 字段同步写入,IK 中文 + 英文标准分词,保证模糊匹配。

3. 分布式架构图

┌-------------┐
│  Gateway    │  Nginx 统一入口
└-----┬-------┘
      │  REST
┌-----┴-------┐
│  Hot ES 1-3 │  SSD 节点
└-----┬-------┘
      │  Snapshot/ILM
┌-----┴-------┐
│  Warm ES 4-6│  HDD 节点
└-----┬-------┘
      │  MinIO
┌-----┴-------┐
│  Frozen Repo│  冷数据对象存储
└-------------┘

代码实战:三段必收藏脚本

① 数据预处理管道(Python)

# pipeline.py
import zlib, json, hashlib
from datetime import datetime
from sentence_transformers import SentenceTransformer

model = SentenceTransformer('all-MiniLM-L6-v2')

def pack_record(raw: dict) -> dict:
    """返回 ES 就绪文档,同时进行向量推理与压缩"""
    text = raw['content']
    vec = model.encode(text, normalize_embeddings=True)
    return {
        "session_id": raw["session_id"],
        "msg_id": raw["msg_id"],
        "timestamp": raw["ts"],
        "role": raw["role"],
        "content": text,
        "content_zlib": zlib.compress(text.encode('utf-8')),  # 冷存字段
        "topic_vec": vec.tolist()
    }

② 索引构建逻辑(bulk & routing)

# index.py
from elasticsearch import Elasticsearch
es = Elasticsearch("http://hot-es:9200")

def bulk_index(records, shard_key="session_id"):
    actions = []
    for rec in records:
        actions.append({
            "_index": f"chat_archive_{datetime.utcnow().strftime('%Y.%m.%d')}",
            "_id": rec["msg_id"],
            "routing": rec[shard_key],          # 指定路由,避免分散
            "_source": rec
        })
    from elasticsearch.helpers import bulk
    bulk(es, actions, chunk_size=800, max_retries=3)

③ 查询优化:时间 + 向量混合

# search.py
def semantic_search(user_text, start_dt, end_dt, top_k=20):
    vec = model.encode(user_text).tolist()
    body = {
        "query": {
            "bool": {
                "must": [
                    {"range": {"timestamp": {"gte": start_dt, "lte": end_dt}}},
                    {"knn": {"field": "topic_vec", "query_vector": vec, "k": top_k, "num_candidates": 100}}
                ]
            }
        },
        "size": top_k,
        "_source": {"excludes": ["topic_vec", "content_zlib"]}  # 节省带宽
    }
    return es.search(index="chat_archive_*", body=body)

性能考量:压测与账单

  • 写入吞吐:单节点 3.2 万 doc/s(16 线程,bulk 800)。
  • 查询 QPS:混合(关键词+向量)场景 1.2 k,P99 延迟 235 ms;纯向量 1.6 k,P99 180 ms。
  • 存储成本对比(100 TB/月):
    • 热 SSD 副本 1:$5 400
    • 冷 HDD 压缩 + 对象存储:$1 050
      冷热分层后整体成本下降 57%,而 90% 查询仍落在 7 天热区,用户体验无感。

避坑指南:让系统“长寿”的三板斧

  1. 冷热数据分离
    用 ES Index Lifecycle Management(ILM)自动 7 天热→30 天冷→180 天冻结,配合 "searchable_snapshot" 保证冻结段仍可查,但只占对象存储低价空间。

  2. 索引碎片化预防

    • 强制段合并:冷迁移后执行 "max_num_segments":1",把 300 → 1,搜索线程不再频繁打开小段。
    • 定期 "forcemerge" 前一定把副本置 0,合并完再加回,避免 2× 磁盘浪费。
  3. 权限管理

    • 采用 API Key + Role 模板化:业务方仅授予别名级别 "read" 权限,无法直接访问后台索引;运维保留 "manage"
    • 对敏感字段(手机号、身份证)走字段级加密(AES-256),密钥放 Vault,定期轮换。

思考题:跨会话关联检索怎么做?

目前单条消息只保存 session_id 做路由,但如果用户想提问“把上周所有提到‘退款’的会话一次性找出来”,就需要跨会话聚合
提示:可在写入后异步跑 Spark 作业,将命中关键词的 session_id 写回 Redis Set,查询阶段先捞 Set 再回表。
你有没有更优雅的方案? 欢迎留言讨论。


把“对话归档”玩成“实时通话 AI”

做完上面的归档系统,我对语音+文本的混合检索链路有了完整体感。最近发现火山引擎有个**从0打造个人豆包实时通话AI**动手实验,一口气把 ASR→LLM→TTS 串成低延迟通话 Demo。
我跟着文档 45 分钟就跑通,Web 页直接麦克风对话,音色还能选“呆萌男高”或“温柔御”,瞬间把枯燥的归档数据变成了会说话的 AI 伙伴。
如果你也想把“静默数据”升级成“实时交互”,不妨去戳链接,小白也能顺利体验。

点击开始动手实验


Logo

这里是“一人公司”的成长家园。我们提供从产品曝光、技术变现到法律财税的全栈内容,并连接云服务、办公空间等稀缺资源,助你专注创造,无忧运营。

更多推荐