Indexer 组件:把切好的知识块存进向量数据库
系列「企业级 AI Agent 实现拆解」E13 篇。上一篇 E12 讲了 Document 组件——怎么加载文件、切成小块。切完了之后怎么办?存起来。存在哪?向量数据库。谁来存?Indexer。这篇拆 Indexer 组件:一个接口、五种后端、一条写路径。
读完这篇你会知道
Indexer接口只有一个方法,为什么这么设计- Indexer 和 Embedding 的分工:谁来算向量
- 五种后端(VikingDB / Redis / Elasticsearch / Milvus / Qdrant)配置有什么区别
- VikingDB 内置向量化和自定义向量化怎么选
- Redis Indexer 的
DocumentToHashes映射函数怎么写- 完整的知识库入库 Pipeline:Loader → Splitter → Indexer
一、先说清楚写路径
RAG 有两条路径:
- 写路径:文件 → 切块 → 向量化 → 存储(建库)
- 读路径:用户提问 → 向量化 → 检索(Retriever)→ 返回最相关的块
Indexer 负责写路径的最后一步:把 Document 列表存进向量数据库。
二、接口极简:只有一个方法
源码在 eino/components/indexer/interface.go:
type Indexer interface {
Store(ctx context.Context, docs []*schema.Document, opts ...Option) (ids []string, err error)
}
就一个 Store()。传入 Document 列表,返回存储后的 ID 列表。
Option 结构体里有三个字段:
type Options struct {
Index *string // 运行时选择索引(覆盖配置里的默认值)
SubIndexes []string // 子索引,实现逻辑分区
Embedding embedding.Embedder // 运行时替换向量化器
}
三个字段都是可选的,都能在调用时覆盖配置里的默认值。
三、Indexer 和 Embedding 怎么分工
Indexer 本身不做向量化,它通过注入一个 Embedding 实例来完成向量化:
// 配置时注入(启动时绑定)
config.Embedding = myEmbeddingInstance
indexer, _ := NewIndexer(ctx, config)
// 调用时覆盖(单次生效)
indexer.Store(ctx, docs, indexer.WithEmbedding(otherEmbedder))
这个设计的好处:Indexer 的实现代码和向量化逻辑完全解耦,你可以随时换一个 Embedding 模型,不改 Indexer 代码。
铁律:Indexer 和对应的 Retriever 必须用同一个 Embedding 模型。 写入用 text-embedding-3-small 算向量,检索用 text-embedding-ada-002 就完全错了——维度一致但语义空间不同,检索结果会乱。
四、五种后端配置
VikingDB(火山引擎)
源码:eino-ext/components/indexer/volc_vikingdb/indexer.go
VikingDB 是唯一支持内置向量化的后端——不传 Embedding 实例,让 VikingDB 平台在服务端帮你算向量:
cfg := &volc_vikingdb.IndexerConfig{
Host: "api-vikingdb.volces.com",
Region: "cn-beijing",
AK: ak,
SK: sk,
Collection: "eino_test",
EmbeddingConfig: volc_vikingdb.EmbeddingConfig{
UseBuiltin: true, // 使用平台内置向量化
ModelName: "bge-m3", // 支持稠密+稀疏混合向量
UseSparse: true, // 同时返回稀疏向量(用于混合检索)
},
AddBatchSize: 10,
}
也支持传自定义 Embedding:
EmbeddingConfig: volc_vikingdb.EmbeddingConfig{
UseBuiltin: false,
Embedding: myEmbedder, // 自定义向量化器
},
VikingDB 还支持额外字段和 TTL(其他后端都不支持):
volc_vikingdb.SetExtraDataFields(doc, map[string]interface{}{
"department": "engineering",
})
volc_vikingdb.SetExtraDataTTL(doc, 86400) // 24小时后自动删除
Redis
源码:eino-ext/components/indexer/redis/indexer.go
Redis 的特点是完全自定义映射——通过 DocumentToHashes 函数决定 Document 怎么存成 Redis Hash:
// 来自 eino-examples 的真实实现
config := &redis.IndexerConfig{
Client: redisClient,
KeyPrefix: "eino_doc:",
BatchSize: 1,
Embedding: embeddingIns,
DocumentToHashes: func(ctx context.Context, doc *schema.Document) (*redis.Hashes, error) {
if doc.ID == "" {
doc.ID = uuid.New().String()
}
metadataBytes, _ := json.Marshal(doc.MetaData)
return &redis.Hashes{
Key: doc.ID,
Field2Value: map[string]redis.FieldValue{
"content": {
Value: doc.Content,
EmbedKey: "vector", // 这个字段会被向量化,向量存到 "vector" key
},
"metadata": {
Value: metadataBytes, // 不带 EmbedKey = 不向量化,直接存
},
},
}, nil
},
}
最终 Redis 里长这样:
HSET eino_doc:uuid-xxx
content "Go 的并发模型基于 CSP..."
vector <1536维二进制向量>
metadata "{\"chapter\":\"第三章\",\"section\":\"3.1\"}"
不传 DocumentToHashes 则用默认映射:content 向量化,MetaData 所有 key 直接存。
Elasticsearch 8
源码:eino-ext/components/indexer/es8/indexer.go
结构和 Redis 类似,支持自动创建索引:
config := &es8.IndexerConfig{
Client: esClient,
Index: "knowledge_base",
IndexSpec: &es8.IndexSpec{ // 不传则索引需提前手动创建
Settings: map[string]any{
"number_of_shards": 1,
"number_of_replicas": 0,
},
Mappings: map[string]any{
"properties": map[string]any{
"content": map[string]any{"type": "text"},
"vector": map[string]any{
"type": "dense_vector",
"dims": 1536,
},
},
},
},
Embedding: myEmbedder,
}
底层用 ES Bulk Indexer,高吞吐批量写入。
Milvus2
源码:eino-ext/components/indexer/milvus2/indexer.go
功能最全,支持稀疏+稠密混合向量、BM25 内置函数、动态 Schema:
cfg := &milvus2.IndexerConfig{
ClientConfig: &milvusclient.ClientConfig{Address: "localhost:19530"},
Collection: "eino_collection",
Vector: &milvus2.VectorConfig{
Dimension: 1536,
MetricType: milvus2.MetricTypeCosine,
VectorField: "vector",
},
Embedding: myEmbedder,
EnableDynamicSchema: true, // 动态字段,无需预定义 Schema
}
Qdrant
配置最简洁:
cfg := &qdrant.Config{
Client: qdrantClient,
Collection: "eino_collection",
VectorDim: 1536,
Distance: qdrant.Distance_Cosine,
Embedding: myEmbedder,
}
五、五种后端一眼看差别
| VikingDB | Redis | ES8 | Milvus2 | Qdrant | |
|---|---|---|---|---|---|
| 内置向量化 | ✓(bge-m3等) | ✗ | ✗ | ✗ | ✗ |
| 稀疏向量 | ✓ | ✗ | ✓ | ✓ | ✗ |
| TTL | ✓ | ✗ | ✗ | ✗ | ✗ |
| 自动建索引 | ✗ | ✗ | ✓ | ✓ | ✗ |
| 自定义映射函数 | ✗ | ✓ | ✓ | ✓ | ✗ |
| 国产云 | 火山引擎 | — | — | — | — |
六、完整 Pipeline:从文件到向量库
以 eino-examples 里的知识库入库 Pipeline 为例:
// 源码:eino-examples/quickstart/eino_assistant/eino/knowledgeindexing/orchestration.go
func BuildKnowledgeIndexing(ctx context.Context) (compose.Runnable[document.Source, []string], error) {
g := compose.NewGraph[document.Source, []string]()
// 节点 1:读文件
fileLoader, _ := file.NewFileLoader(ctx, &file.FileLoaderConfig{})
g.AddLoaderNode("Loader", fileLoader)
// 节点 2:按 Markdown 标题切片
splitter, _ := markdown.NewHeaderSplitter(ctx, &markdown.HeaderConfig{
Headers: map[string]string{"#": "title", "##": "section"},
})
g.AddDocumentTransformerNode("Splitter", splitter)
// 节点 3:向量化 + 存入 Redis
redisIndexer, _ := redis.NewIndexer(ctx, &redis.IndexerConfig{
Client: redisClient,
KeyPrefix: "kb:",
Embedding: myEmbedder,
})
g.AddIndexerNode("Indexer", redisIndexer)
// 连线
g.AddEdge(compose.START, "Loader")
g.AddEdge("Loader", "Splitter")
g.AddEdge("Splitter", "Indexer")
g.AddEdge("Indexer", compose.END)
return g.Compile(ctx, compose.WithGraphName("KnowledgeIndexing"))
}
批量入库:
pipeline, _ := BuildKnowledgeIndexing(ctx)
filepath.WalkDir(docsDir, func(path string, d fs.DirEntry, err error) error {
if strings.HasSuffix(path, ".md") {
ids, _ := pipeline.Invoke(ctx, document.Source{URI: path})
fmt.Printf("入库 %d 块:%s\n", len(ids), path)
}
return nil
})
整条链路:一个 Markdown 文件进去,N 个向量 ID 出来。
七、加 Callback 监控入库
handler := callbacksHelper.NewHandlerHelper().
Indexer(&callbacksHelper.IndexerCallbackHandler{
OnStart: func(ctx context.Context, info *callbacks.RunInfo, input *indexer.CallbackInput) context.Context {
log.Printf("[Indexer] 开始存入 %d 个块", len(input.Docs))
return ctx
},
OnEnd: func(ctx context.Context, info *callbacks.RunInfo, output *indexer.CallbackOutput) context.Context {
log.Printf("[Indexer] 已存入,IDs: %v", output.IDs)
return ctx
},
}).
Handler()
ids, _ := pipeline.Invoke(ctx, src, compose.WithCallbacks(handler))
小结
切好的 [Document, Document, ...]
↓ Indexer.Store()
├── 调 Embedding.EmbedStrings() 向量化 Content
├── 批量写入后端(Redis/ES/Milvus 等)
└── 返回 [ID1, ID2, ...]
选哪个后端?
| 场景 | 推荐 |
|---|---|
| 国内云,要混合检索(稠密+稀疏) | VikingDB |
| 已有 Redis,快速上线 | Redis Indexer |
| 已有 ES,需要全文+向量混搜 | ES8 |
| 需要最灵活的向量索引配置 | Milvus2 |
| 轻量自建,配置简单 | Qdrant |
Indexer 只是写路径的终点。存进去之后,读路径由 Retriever 负责——那是下一篇的主题。
更多推荐


所有评论(0)