搜索管道(Search Pipeline)是 Easysearch 中用于自定义搜索请求和响应处理的机制。通过定义一系列处理器(processors),可以在搜索执行前修改请求、在返回结果前处理响应。
API #
PUT /_search/pipeline/{id}
API 的作用 #
创建新的搜索管道或更新已存在的管道。搜索管道的主要功能包括:
- 请求处理:在搜索执行前修改搜索请求(查询重写、过滤增强等)
- 响应处理:在返回搜索结果前处理结果(字段重命名、结果增强等)
- 结果重排序:对初步合并的搜索结果进行二次排序
- API 密钥加密:自动加密语义查询增强器中的 API 密钥
API 的参数 #
路由参数 #
| 参数 | 类型 | 是否必需 | 描述 |
|---|---|---|---|
id | string | 必需 | 管道的唯一标识符 |
查询字符串参数 #
| 参数 | 类型 | 是否必需 | 默认值 | 描述 |
|---|---|---|---|---|
master_timeout | time | 可选 | 30s | 等待主节点响应的超时时间 |
timeout | time | 可选 | 30s | 操作超时时间 |
pretty | boolean | 可选 | - | 格式化 JSON 输出 |
请求体参数 #
请求体需要包含管道配置,基本结构如下:
| 参数 | 类型 | 是否必需 | 描述 |
|---|---|---|---|
config | object | 必需 | 管道配置对象 |
config.request_processors | array | 可选 | 请求处理器列表 |
config.response_processors | array | 可选 | 响应处理器列表 |
请求体结构 #
{
"config": {
"request_processors": [
{
"type": "processor_type",
"tag": "optional_tag",
"description": "optional_description",
"ignore_failure": false,
"processor_specific_config": {}
}
],
"response_processors": [
{
"type": "processor_type",
"tag": "optional_tag",
"description": "optional_description",
"ignore_failure": false,
"processor_specific_config": {}
}
]
}
}
常用处理器类型 #
请求处理器 (request_processors) #
| 处理器 | 功能 |
|---|---|
filter_query | 添加查询过滤条件 |
script | 使用脚本修改搜索请求 |
semantic_query_enricher | 语义查询增强(AI 功能) |
响应处理器 (response_processors) #
| 处理器 | 功能 |
|---|---|
rename_field | 重命名字段 |
script | 使用脚本处理搜索结果 |
rerank | 结果重排序 |
请求示例 #
基础搜索管道 #
PUT /_search/pipeline/my-pipeline
{
"config": {
"request_processors": [
{
"type": "filter_query",
"tag": "public_filter",
"description": "只返回公开数据",
"query": {
"term": {
"visibility": "public"
}
}
}
]
}
}
语义查询增强管道(AI 功能) #
PUT /_search/pipeline/semantic-pipeline
{
"config": {
"request_processors": [
{
"type": "semantic_query_enricher",
"tag": "semantic_enrichment",
"description": "使用语义增强查询",
"model_name": "text-embedding-ada-002",
"api_key": "your-api-key-here",
"text_field": "content",
"embedding_field": "content_embedding"
}
]
}
}
重要说明:api_key 会被自动加密存储为 ENCRYPTED_VALUE...infinilabs 格式。
完整配置示例 #
PUT /_search/pipeline/full-pipeline
{
"config": {
"request_processors": [
{
"type": "filter_query",
"tag": "filter_deleted",
"query": {
"term": {
"deleted": false
}
},
"ignore_failure": true
},
{
"type": "script",
"tag": "add_timestamp",
"script": {
"source": "ctx._source.query.bool.must.push({range: {@timestamp: {gte: 'now-7d'}}})",
"lang": "painless"
}
}
],
"response_processors": [
{
"type": "rename_field",
"tag": "rename_fields",
"field_mappings": [
{
"old_field": "title",
"new_field": "headline"
},
{
"old_field": "body",
"new_field": "content"
}
]
},
{
"type": "rerank",
"tag": "rerank_results",
"rerank_model": "my-rerank-model",
"top_n": 10
}
]
}
}
响应示例 #
成功响应 #
{
"acknowledged": true
}
错误响应 - 管道定义无效 #
{
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "Processor type [unknown_processor] is not installed"
}
],
"type": "illegal_argument_exception",
"reason": "Processor type [unknown_processor] is not installed"
},
"status": 400
}
使用搜索管道 #
创建管道后,可以通过以下方式使用:
方式 1:搜索时指定管道 #
GET /my-index/_search?search_pipeline=my-pipeline
{
"query": {
"match": { "message": "error" }
}
}
方式 2:在索引设置中指定默认管道 #
PUT /my-index
{
"settings": {
"index.search.default_pipeline": "my-pipeline"
}
}
方式 3:临时使用管道(覆盖默认) #
GET /my-index/_search?search_pipeline=temp-pipeline
{
"query": {
"match_all": {}
}
}
方式 4:禁用管道 #
GET /my-index/_search?search_pipeline=_none
{
"query": {
"match_all": {}
}
}
处理器执行顺序 #
- 请求处理器:按照定义顺序依次执行
- 搜索执行:使用处理后的请求执行搜索
- 响应处理器:按照定义顺序依次处理搜索结果
如果某个处理器失败且设置了 ignore_failure: true,则跳过该处理器继续执行。
安全特性 #
API 密钥加密 #
对于包含 API 密钥的处理器(如 semantic_query_enricher),Easysearch 会自动加密 API 密钥:
{
"type": "semantic_query_enricher",
"api_key": "sk-1234567890abcdef"
}
存储后自动变为:
{
"type": "semantic_query_enricher",
"api_key": "ENCRYPTED_VALUE:aBc123...infinilabs"
}
注意事项 #
- API 密钥加密:包含 API 密钥的处理器会自动加密存储
- 管道覆盖:搜索参数中的管道会覆盖索引默认管道
- 处理器顺序:处理器按数组顺序执行,请合理排列
- 测试管道:建议先在测试环境验证管道逻辑
- 性能影响:复杂的处理器会影响搜索性能





