摄入管道(Ingest Pipeline)是 Easysearch 中用于在文档索引之前预处理文档的机制。通过定义一系列处理器(processors),可以在数据被索引之前对其进行转换、丰富和规范化。
API #
PUT /_ingest/pipeline/{id}
API 的作用 #
创建新的摄入管道或更新已存在的管道。管道的主要功能包括:
- 数据转换:重命名字段、修改字段值、提取数据等
- 数据丰富:添加外部数据、查找关联信息等
- 数据规范化:格式转换、类型转换、数据清洗等
- AI 处理:文本嵌入、智能分析等(Easysearch 特有功能)
管道在文档被索引之前自动执行,无需修改客户端代码。
API 的参数 #
路由参数 #
| 参数 | 类型 | 是否必需 | 描述 |
|---|---|---|---|
id | string | 必需 | 管道的唯一标识符 |
查询字符串参数 #
| 参数 | 类型 | 是否必需 | 默认值 | 描述 |
|---|---|---|---|---|
master_timeout | time | 可选 | 30s | 等待主节点连接的超时时间 |
timeout | time | 可选 | 30s | 等待操作完成的总超时时间 |
if_version | integer | 可选 | - | 仅当管道的当前版本匹配指定值时才更新(乐观锁) |
pretty | boolean | 可选 | - | 格式化 JSON 输出 |
请求体参数 #
| 参数 | 类型 | 是否必需 | 描述 |
|---|---|---|---|
description | string | 可选 | 管道的描述信息 |
processors | array | 必需 | 处理器数组,定义处理文档的一系列操作 |
on_failure | array | 可选 | 处理失败时的处理器数组 |
请求体结构 #
{
"description": "管道描述",
"processors": [
{
"processor_type": {
"parameter1": "value1",
"parameter2": "value2"
}
}
],
"on_failure": [
{
"processor_type": {
"parameter1": "value1"
}
}
]
}
常用处理器类型 #
基础处理器 #
| 处理器 | 功能 |
|---|---|
set | 设置字段值 |
remove | 删除字段 |
rename | 重命名字段 |
drop | 丢弃文档 |
convert | 转换字段数据类型 |
内容处理器 #
| 处理器 | 功能 |
|---|---|
grok | 使用正则表达式解析文本 |
dissect | 使用分隔符解析文本 |
csv | 解析 CSV 数据 |
json | 解析 JSON 字符串 |
AI 处理器(Easysearch 特有) #
| 处理器 | 功能 |
|---|---|
text_embedding | 将文本转换为向量嵌入 |
请求示例 #
基础管道 #
PUT /_ingest/pipeline/my-pipeline
{
"description": "处理日志数据",
"processors": [
{
"set": {
"field": "processed_at",
"value": "{{_ingest.timestamp}}"
}
},
{
"remove": {
"field": "temp_field"
}
}
]
}
Grok 解析管道 #
PUT /_ingest/pipeline/apache-logs
{
"description": "解析 Apache 日志",
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{COMMONAPACHELOG}"
]
}
}
]
}
文本嵌入管道(AI 功能) #
PUT /_ingest/pipeline/embedding-pipeline
{
"description": "生成文本嵌入向量",
"processors": [
{
"text_embedding": {
"text_field": "content",
"vector_field": "embedding",
"model_id": "text-embedding-ada-002",
"api_key": "your-api-key"
}
}
]
}
重要说明:text_embedding 处理器中的 api_key 会被自动加密存储,格式为 ENCRYPTED_VALUE...infinilabs。
带错误处理的管道 #
PUT /_ingest/pipeline/safe-pipeline
{
"description": "带错误处理的管道",
"processors": [
{
"convert": {
"field": "price",
"type": "float",
"ignore_failure": true
}
}
],
"on_failure": [
{
"set": {
"field": "error_message",
"value": "{{ _ingest.on_failure_message }}"
}
}
]
}
响应示例 #
成功响应 #
{
"acknowledged": true,
"version": 2
}
错误响应 - 管道定义无效 #
{
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "Processor type [unknown_processor] is not installed"
}
],
"type": "illegal_argument_exception",
"reason": "Processor type [unknown_processor] is not installed"
},
"status": 400
}
使用管道 #
创建管道后,可以通过以下方式使用:
方式 1:索引时指定管道 #
POST /my-index/_doc?pipeline=my-pipeline
{
"field1": "value1",
"field2": "value2"
}
方式 2:在索引设置中指定默认管道 #
PUT /my-index
{
"settings": {
"index.default_pipeline": "my-pipeline"
}
}
处理器执行顺序 #
处理器按照数组中的顺序依次执行:
- 如果某个处理器失败且设置了
ignore_failure: true,则跳过该处理器继续执行 - 如果处理器失败且未设置
ignore_failure,则执行on_failure中的处理器 - 如果
on_failure也失败,文档将被拒绝
注意事项 #
- API 密钥加密:
text_embedding处理器中的 API 密钥会自动加密存储 - 管道版本:每次更新管道会增加版本号,可用于乐观锁控制
- 性能影响:复杂的管道会影响索引性能,建议合理设计
- 测试管道:使用
POST /_ingest/pipeline/{id}/_simulate模拟执行,验证管道逻辑 - 必需的 processors:管道必须至少包含一个处理器





