--- title: "模拟摄入管道执行" date: 2026-02-27 lastmod: 2026-02-27 description: "模拟摄入管道对文档的处理效果" tags: ["摄入管道", "模拟", "测试"] summary: "在索引前模拟摄入管道对示例文档的处理效果,用于调试和测试管道。 API 格式 # GET /_ingest/pipeline/{id}/_simulate POST /_ingest/pipeline/{id}/_simulate GET /_ingest/pipeline/_simulate POST /_ingest/pipeline/_simulate API 作用 # 该 API 用于模拟摄入管道的执行: 测试管道处理器的效果而不实际索引文档 调试管道配置问题 查看每个处理器对文档的修改 验证管道逻辑正确性 API 参数 # 路径参数 # 参数 类型 是否必填 默认值 描述 {id} String 否 - 管道 ID。如未指定则使用最新创建的管道 查询参数 # 参数 类型 是否必填 默认值 描述 verbose Boolean 否 false 是否显示详细模式(包含每个处理器的数据输出) 请求体参数 # 参数 类型 是否必填 默认值 描述 pipeline Object 否 - 要模拟的管道定义。未提供时使用已存在的管道 docs Array 是 - 要测试的文档数组 docs 数组元素结构 # 字段 类型 是否必填 描述 _source Object 是 文档内容 _index String 否 索引名称 _id String 否 文档 ID _type String 否 文档类型(已弃用) _routing String 否 路由值 _version Integer 否 文档版本 _version_type String 否 版本类型 _if_seq_no Long 否 if 序列号 _if_primary_term Long 否 if 主术语 请求示例 # 基本模拟 # POST /_ingest/pipeline/my-pipeline/_simulate { "docs": [ { "_index": "my_index", "_source": { "field1": "value1", "field2": "value2" } } ] } 详细模式 # POST /_ingest/pipeline/my-pipeline/_simulate?" --- 在索引前模拟摄入管道对示例文档的处理效果,用于调试和测试管道。 ## API 格式 ``` GET /_ingest/pipeline/{id}/_simulate POST /_ingest/pipeline/{id}/_simulate GET /_ingest/pipeline/_simulate POST /_ingest/pipeline/_simulate ``` ## API 作用 该 API 用于模拟摄入管道的执行: - 测试管道处理器的效果而不实际索引文档 - 调试管道配置问题 - 查看每个处理器对文档的修改 - 验证管道逻辑正确性 ## API 参数 ### 路径参数 | 参数 | 类型 | 是否必填 | 默认值 | 描述 | |------|------|----------|--------|------| | `{id}` | String | 否 | - | 管道 ID。如未指定则使用最新创建的管道 | ### 查询参数 | 参数 | 类型 | 是否必填 | 默认值 | 描述 | |------|------|----------|--------|------| | `verbose` | Boolean | 否 | `false` | 是否显示详细模式(包含每个处理器的数据输出) | ### 请求体参数 | 参数 | 类型 | 是否必填 | 默认值 | 描述 | |------|------|----------|--------|------| | `pipeline` | Object | 否 | - | 要模拟的管道定义。未提供时使用已存在的管道 | | `docs` | Array | **是** | - | 要测试的文档数组 | ### docs 数组元素结构 | 字段 | 类型 | 是否必填 | 描述 | |------|------|----------|------| | `_source` | Object | **是** | 文档内容 | | `_index` | String | 否 | 索引名称 | | `_id` | String | 否 | 文档 ID | | `_type` | String | 否 | 文档类型(已弃用) | | `_routing` | String | 否 | 路由值 | | `_version` | Integer | 否 | 文档版本 | | `_version_type` | String | 否 | 版本类型 | | `_if_seq_no` | Long | 否 | if 序列号 | | `_if_primary_term` | Long | 否 | if 主术语 | ## 请求示例 ### 基本模拟 ```json POST /_ingest/pipeline/my-pipeline/_simulate { "docs": [ { "_index": "my_index", "_source": { "field1": "value1", "field2": "value2" } } ] } ``` ### 详细模式 ```json POST /_ingest/pipeline/my-pipeline/_simulate?verbose=true { "docs": [ { "_source": { "message": "hello world" } } ] } ``` ### 使用自定义管道定义 ```json POST /_ingest/pipeline/_simulate { "pipeline": { "description": "Test pipeline", "processors": [ { "set": { "field": "processed", "value": true } } ] }, "docs": [ { "_source": { "field1": "value1" } } ] } ``` ## 响应示例 ### 基本响应 ```json { "docs": [ { "doc": { "_index": "my_index", "_id": "doc_id", "_source": { "field1": "value1", "field2": "value2", "processed": true }, "_ingest": { "timestamp": "2026-02-04T12:00:00Z", "pipeline": { "pipeline_name": "my-pipeline", "processors": [ { "tag": "processor-1", "type": "set", "status": "success" } ] } } } } ] } ``` ### 详细模式响应(verbose=true) ```json { "docs": [ { "processor_results": [ { "processor_type": "set", "tag": "my-tag", "description": "Set field", "status": "success", "condition": { "condition": "ctx.field != null", "result": true }, "doc": { "_index": "test", "_id": "1", "_source": { "field1": "value1", "field2": "value2", "processed": true }, "_ingest": { "timestamp": "2026-02-04T12:00:00Z" } } } ] } ] } ``` ## 响应字段说明 ### docs 数组 | 字段 | 类型 | 描述 | |------|------|------| | `doc` | Object | 处理后的文档(基本模式) | | `processor_results` | Array | 处理器结果数组(详细模式) | ### 处理器结果(verbose 模式) | 字段 | 类型 | 描述 | |------|------|------| | `processor_type` | String | 处理器类型 | | `tag` | String | 处理器标签 | | `description` | String | 处理器描述 | | `status` | String | 执行状态 | | `condition` | Object | 条件检查结果 | | `doc` | Object | 处理后的文档 | ### 状态类型 | 状态 | 描述 | |------|------| | `success` | 处理器执行成功 | | `error` | 处理器失败 | | `error_ignored` | 处理器失败但被忽略 | | `skipped` | 处理器被跳过 | | `dropped` | 文档被丢弃 | ## 使用场景 1. **管道测试**:在创建管道前验证其行为 2. **调试问题**:定位管道中的错误处理器 3. **性能评估**:检查处理器执行效果 4. **逻辑验证**:确认条件判断正确性 ## 注意事项 1. **不索引文档**:模拟操作不会实际索引文档 2. **默认管道**:未指定管道 ID 时使用最新创建的管道 3. **verbose 影响**:详细模式会显著增加响应大小 4. **处理器状态**:检查每个处理器的状态以识别问题 ## 相关操作 - **PUT /_ingest/pipeline/{id}**:创建或更新摄入管道 - **GET /_ingest/pipeline/{id}**:查询摄入管道 - **DELETE /_ingest/pipeline/{id}**:删除摄入管道 - **GET /_ingest/pipeline**:查询所有管道 ## 实现文件 - **REST 处理器**:`RestSimulatePipelineAction.java` - **请求类**:`SimulatePipelineRequest.java` - **响应类**:`SimulatePipelineResponse.java`