--- title: "创建或更新摄入管道" date: 2026-03-23 lastmod: 2026-03-23 description: "介绍如何在 Easysearch 中创建或更新摄入管道(Ingest Pipeline),用于在索引前处理文档。" tags: ["摄入管道", "数据处理", "AI"] summary: "摄入管道(Ingest Pipeline)是 Easysearch 中用于在文档索引之前预处理文档的机制。通过定义一系列处理器(processors),可以在数据被索引之前对其进行转换、丰富和规范化。 API # PUT /_ingest/pipeline/{id} API 的作用 # 创建新的摄入管道或更新已存在的管道。管道的主要功能包括: 数据转换:重命名字段、修改字段值、提取数据等 数据丰富:添加外部数据、查找关联信息等 数据规范化:格式转换、类型转换、数据清洗等 AI 处理:文本嵌入、智能分析等(Easysearch 特有功能) 管道在文档被索引之前自动执行,无需修改客户端代码。 API 的参数 # 路由参数 # 参数 类型 是否必需 描述 id string 必需 管道的唯一标识符 查询字符串参数 # 参数 类型 是否必需 默认值 描述 master_timeout time 可选 30s 等待主节点连接的超时时间 timeout time 可选 30s 等待操作完成的总超时时间 if_version integer 可选 - 仅当管道的当前版本匹配指定值时才更新(乐观锁) pretty boolean 可选 - 格式化 JSON 输出 请求体参数 # 参数 类型 是否必需 描述 description string 可选 管道的描述信息 processors array 必需 处理器数组,定义处理文档的一系列操作 on_failure array 可选 处理失败时的处理器数组 请求体结构 # { "description": "管道描述", "processors": [ { "processor_type": { "parameter1": "value1", "parameter2": "value2" } } ], "on_failure": [ { "processor_type": { "parameter1": "value1" } } ] } 常用处理器类型 # 基础处理器 # 处理器 功能 set 设置字段值 remove 删除字段 rename 重命名字段 drop 丢弃文档 convert 转换字段数据类型 内容处理器 # 处理器 功能 grok 使用正则表达式解析文本 dissect 使用分隔符解析文本 csv 解析 CSV 数据 json 解析 JSON 字符串 AI 处理器(Easysearch 特有) # 处理器 功能 text_embedding 将文本转换为向量嵌入 请求示例 # 基础管道 # PUT /_ingest/pipeline/my-pipeline { "description": "处理日志数据", "processors": [ { "set": { "field": "processed_at", "value": "{{_ingest." --- 摄入管道(Ingest Pipeline)是 Easysearch 中用于在文档索引之前预处理文档的机制。通过定义一系列处理器(processors),可以在数据被索引之前对其进行转换、丰富和规范化。 ## API ``` PUT /_ingest/pipeline/{id} ``` ## API 的作用 创建新的摄入管道或更新已存在的管道。管道的主要功能包括: - **数据转换**:重命名字段、修改字段值、提取数据等 - **数据丰富**:添加外部数据、查找关联信息等 - **数据规范化**:格式转换、类型转换、数据清洗等 - **AI 处理**:文本嵌入、智能分析等(Easysearch 特有功能) 管道在文档被索引之前自动执行,无需修改客户端代码。 ## API 的参数 ### 路由参数 | 参数 | 类型 | 是否必需 | 描述 | |------|------|----------|------| | `id` | string | 必需 | 管道的唯一标识符 | ### 查询字符串参数 | 参数 | 类型 | 是否必需 | 默认值 | 描述 | |------|------|----------|--------|------| | `master_timeout` | time | 可选 | 30s | 等待主节点连接的超时时间 | | `timeout` | time | 可选 | 30s | 等待操作完成的总超时时间 | | `if_version` | integer | 可选 | - | 仅当管道的当前版本匹配指定值时才更新(乐观锁) | | `pretty` | boolean | 可选 | - | 格式化 JSON 输出 | ### 请求体参数 | 参数 | 类型 | 是否必需 | 描述 | |------|------|----------|------| | `description` | string | 可选 | 管道的描述信息 | | `processors` | array | 必需 | 处理器数组,定义处理文档的一系列操作 | | `on_failure` | array | 可选 | 处理失败时的处理器数组 | ## 请求体结构 ```json { "description": "管道描述", "processors": [ { "processor_type": { "parameter1": "value1", "parameter2": "value2" } } ], "on_failure": [ { "processor_type": { "parameter1": "value1" } } ] } ``` ## 常用处理器类型 ### 基础处理器 | 处理器 | 功能 | |--------|------| | `set` | 设置字段值 | | `remove` | 删除字段 | | `rename` | 重命名字段 | | `drop` | 丢弃文档 | | `convert` | 转换字段数据类型 | ### 内容处理器 | 处理器 | 功能 | |--------|------| | `grok` | 使用正则表达式解析文本 | | `dissect` | 使用分隔符解析文本 | | `csv` | 解析 CSV 数据 | | `json` | 解析 JSON 字符串 | ### AI 处理器(Easysearch 特有) | 处理器 | 功能 | |--------|------| | `text_embedding` | 将文本转换为向量嵌入 | ## 请求示例 ### 基础管道 ```json PUT /_ingest/pipeline/my-pipeline { "description": "处理日志数据", "processors": [ { "set": { "field": "processed_at", "value": "{{_ingest.timestamp}}" } }, { "remove": { "field": "temp_field" } } ] } ``` ### Grok 解析管道 ```json PUT /_ingest/pipeline/apache-logs { "description": "解析 Apache 日志", "processors": [ { "grok": { "field": "message", "patterns": [ "%{COMMONAPACHELOG}" ] } } ] } ``` ### 文本嵌入管道(AI 功能) ```json PUT /_ingest/pipeline/embedding-pipeline { "description": "生成文本嵌入向量", "processors": [ { "text_embedding": { "text_field": "content", "vector_field": "embedding", "model_id": "text-embedding-ada-002", "api_key": "your-api-key" } } ] } ``` **重要说明**:`text_embedding` 处理器中的 `api_key` 会被自动加密存储,格式为 `ENCRYPTED_VALUE...infinilabs`。 ### 带错误处理的管道 ```json PUT /_ingest/pipeline/safe-pipeline { "description": "带错误处理的管道", "processors": [ { "convert": { "field": "price", "type": "float", "ignore_failure": true } } ], "on_failure": [ { "set": { "field": "error_message", "value": "{{ _ingest.on_failure_message }}" } } ] } ``` ## 响应示例 ### 成功响应 ```json { "acknowledged": true, "version": 2 } ``` ### 错误响应 - 管道定义无效 ```json { "error": { "root_cause": [ { "type": "illegal_argument_exception", "reason": "Processor type [unknown_processor] is not installed" } ], "type": "illegal_argument_exception", "reason": "Processor type [unknown_processor] is not installed" }, "status": 400 } ``` ## 使用管道 创建管道后,可以通过以下方式使用: ### 方式 1:索引时指定管道 ```json POST /my-index/_doc?pipeline=my-pipeline { "field1": "value1", "field2": "value2" } ``` ### 方式 2:在索引设置中指定默认管道 ```json PUT /my-index { "settings": { "index.default_pipeline": "my-pipeline" } } ``` ## 处理器执行顺序 处理器按照数组中的顺序依次执行: 1. 如果某个处理器失败且设置了 `ignore_failure: true`,则跳过该处理器继续执行 2. 如果处理器失败且未设置 `ignore_failure`,则执行 `on_failure` 中的处理器 3. 如果 `on_failure` 也失败,文档将被拒绝 ## 注意事项 1. **API 密钥加密**:`text_embedding` 处理器中的 API 密钥会自动加密存储 2. **管道版本**:每次更新管道会增加版本号,可用于乐观锁控制 3. **性能影响**:复杂的管道会影响索引性能,建议合理设计 4. **测试管道**:使用 `POST /_ingest/pipeline/{id}/_simulate` 模拟执行,验证管道逻辑 5. **必需的 processors**:管道必须至少包含一个处理器 ## 相关文档 - [查询所有摄入管道](./get-all-ingest-pipelines.md) - [查询指定摄入管道](./get-specific-ingest-pipeline.md) - [模拟摄入管道执行](./simulate-ingest-pipeline.md) - [删除摄入管道](./delete-ingest-pipeline.md)