在索引前模拟摄入管道对示例文档的处理效果,用于调试和测试管道。
API 格式 #
GET /_ingest/pipeline/{id}/_simulate
POST /_ingest/pipeline/{id}/_simulate
GET /_ingest/pipeline/_simulate
POST /_ingest/pipeline/_simulate
API 作用 #
该 API 用于模拟摄入管道的执行:
- 测试管道处理器的效果而不实际索引文档
- 调试管道配置问题
- 查看每个处理器对文档的修改
- 验证管道逻辑正确性
API 参数 #
路径参数 #
| 参数 | 类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
{id} | String | 否 | - | 管道 ID。如未指定则使用最新创建的管道 |
查询参数 #
| 参数 | 类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
verbose | Boolean | 否 | false | 是否显示详细模式(包含每个处理器的数据输出) |
请求体参数 #
| 参数 | 类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
pipeline | Object | 否 | - | 要模拟的管道定义。未提供时使用已存在的管道 |
docs | Array | 是 | - | 要测试的文档数组 |
docs 数组元素结构 #
| 字段 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
_source | Object | 是 | 文档内容 |
_index | String | 否 | 索引名称 |
_id | String | 否 | 文档 ID |
_type | String | 否 | 文档类型(已弃用) |
_routing | String | 否 | 路由值 |
_version | Integer | 否 | 文档版本 |
_version_type | String | 否 | 版本类型 |
_if_seq_no | Long | 否 | if 序列号 |
_if_primary_term | Long | 否 | if 主术语 |
请求示例 #
基本模拟 #
POST /_ingest/pipeline/my-pipeline/_simulate
{
"docs": [
{
"_index": "my_index",
"_source": {
"field1": "value1",
"field2": "value2"
}
}
]
}
详细模式 #
POST /_ingest/pipeline/my-pipeline/_simulate?verbose=true
{
"docs": [
{
"_source": {
"message": "hello world"
}
}
]
}
使用自定义管道定义 #
POST /_ingest/pipeline/_simulate
{
"pipeline": {
"description": "Test pipeline",
"processors": [
{
"set": {
"field": "processed",
"value": true
}
}
]
},
"docs": [
{
"_source": {
"field1": "value1"
}
}
]
}
响应示例 #
基本响应 #
{
"docs": [
{
"doc": {
"_index": "my_index",
"_id": "doc_id",
"_source": {
"field1": "value1",
"field2": "value2",
"processed": true
},
"_ingest": {
"timestamp": "2026-02-04T12:00:00Z",
"pipeline": {
"pipeline_name": "my-pipeline",
"processors": [
{
"tag": "processor-1",
"type": "set",
"status": "success"
}
]
}
}
}
}
]
}
详细模式响应(verbose=true) #
{
"docs": [
{
"processor_results": [
{
"processor_type": "set",
"tag": "my-tag",
"description": "Set field",
"status": "success",
"condition": {
"condition": "ctx.field != null",
"result": true
},
"doc": {
"_index": "test",
"_id": "1",
"_source": {
"field1": "value1",
"field2": "value2",
"processed": true
},
"_ingest": {
"timestamp": "2026-02-04T12:00:00Z"
}
}
}
]
}
]
}
响应字段说明 #
docs 数组 #
| 字段 | 类型 | 描述 |
|---|---|---|
doc | Object | 处理后的文档(基本模式) |
processor_results | Array | 处理器结果数组(详细模式) |
处理器结果(verbose 模式) #
| 字段 | 类型 | 描述 |
|---|---|---|
processor_type | String | 处理器类型 |
tag | String | 处理器标签 |
description | String | 处理器描述 |
status | String | 执行状态 |
condition | Object | 条件检查结果 |
doc | Object | 处理后的文档 |
状态类型 #
| 状态 | 描述 |
|---|---|
success | 处理器执行成功 |
error | 处理器失败 |
error_ignored | 处理器失败但被忽略 |
skipped | 处理器被跳过 |
dropped | 文档被丢弃 |
使用场景 #
- 管道测试:在创建管道前验证其行为
- 调试问题:定位管道中的错误处理器
- 性能评估:检查处理器执行效果
- 逻辑验证:确认条件判断正确性
注意事项 #
- 不索引文档:模拟操作不会实际索引文档
- 默认管道:未指定管道 ID 时使用最新创建的管道
- verbose 影响:详细模式会显著增加响应大小
- 处理器状态:检查每个处理器的状态以识别问题
相关操作 #
- PUT /_ingest/pipeline/{id}:创建或更新摄入管道
- GET /_ingest/pipeline/{id}:查询摄入管道
- DELETE /_ingest/pipeline/{id}:删除摄入管道
- GET /_ingest/pipeline:查询所有管道
实现文件 #
- REST 处理器:
RestSimulatePipelineAction.java - 请求类:
SimulatePipelineRequest.java - 响应类:
SimulatePipelineResponse.java





