--- title: "处理管道" date: 2026-01-01 lastmod: 2026-01-01 description: "管理数据摄取管道,在索引前预处理文档数据" tags: ["数据处理", "管道", "Ingest"] summary: "处理管道(Ingest Pipeline)用于在文档被索引之前对其进行预处理和转换。 API # 创建或更新管道 # PUT /_ingest/pipeline/{id} 获取管道 # GET /_ingest/pipeline GET /_ingest/pipeline/{id} 删除管道 # DELETE /_ingest/pipeline/{id} 模拟管道 # POST /_ingest/pipeline/{id}/_simulate POST /_ingest/pipeline/_simulate API 的作用 # 处理管道定义了一系列处理器(processors),按顺序对文档进行转换。 管道的用途 # 用途 描述 数据清洗 去除不需要的字段、格式化数据 数据丰富 添加计算字段、外部数据 数据转换 重命名字段、类型转换 数据提取 从文本中提取结构化数据 处理流程 # 原始文档 → 处理器1 → 处理器2 → ." --- 处理管道(Ingest Pipeline)用于在文档被索引之前对其进行预处理和转换。 ## API ### 创建或更新管道 ``` PUT /_ingest/pipeline/{id} ``` ### 获取管道 ``` GET /_ingest/pipeline GET /_ingest/pipeline/{id} ``` ### 删除管道 ``` DELETE /_ingest/pipeline/{id} ``` ### 模拟管道 ``` POST /_ingest/pipeline/{id}/_simulate POST /_ingest/pipeline/_simulate ``` ## API 的作用 处理管道定义了一系列处理器(processors),按顺序对文档进行转换。 ### 管道的用途 | 用途 | 描述 | |------|------| | **数据清洗** | 去除不需要的字段、格式化数据 | | **数据丰富** | 添加计算字段、外部数据 | | **数据转换** | 重命名字段、类型转换 | | **数据提取** | 从文本中提取结构化数据 | ### 处理流程 ``` 原始文档 → 处理器1 → 处理器2 → ... → 处理器N → 处理后文档 → 索引 ``` ## API 的参数 ### PUT /_ingest/pipeline/{id} #### 路由参数 | 参数 | 类型 | 是否必填 | 描述 | |------|------|----------|------| | `{id}` | 字符串 | 必需 | 管道 ID | #### Query String 参数 | 参数 | 类型 | 是否必填 | 默认值 | 描述 | |------|------|----------|--------|------| | `master_timeout` | 时间值 | 否 | 30s | 等待主节点响应的超时时间 | | `timeout` | 时间值 | 否 | 30s | 等待确认的超时时间 | #### 请求体参数 ```json { "description": "管道描述", "version": 1, "processors": [ { "processor_type": { "field": "field_name", "parameter": "value" } } ], "on_failure": [ { "processor_type": { "parameter": "value" } } ] } ``` | 参数 | 类型 | 是否必填 | 描述 | |------|------|----------|------| | `description` | 字符串 | 否 | 管道描述 | | `version` | 整数 | 否 | 管道版本号 | | `processors` | 数组 | 必需 | 处理器数组 | | `on_failure` | 数组 | 否 | 失败时执行的处理器 | ### GET /_ingest/pipeline #### Query String 参数 | 参数 | 类型 | 是否必填 | 默认值 | 描述 | |------|------|----------|--------|------| | `master_timeout` | 时间值 | 否 | 30s | 等待主节点响应的超时时间 | ### DELETE /_ingest/pipeline/{id} #### Query String 参数 | 参数 | 类型 | 是否必填 | 默认值 | 描述 | |------|------|----------|--------|------| | `master_timeout` | 时间值 | 否 | 30s | 等待主节点响应的超时时间 | | `timeout` | 时间值 | 否 | 30s | 等待确认的超时时间 | ### POST /_ingest/pipeline/{id}/_simulate #### Query String 参数 | 参数 | 类型 | 是否必填 | 默认值 | 描述 | |------|------|----------|--------|------| | `verbose` | 布尔值 | 否 | false | 是否返回详细输出 | #### 请求体参数 模拟指定管道: ```json { "docs": [ { "_source": { "field1": "value1", "field2": "value2" }, "_index": "index_name", "_id": "doc_id" } ] } ``` 内联模拟: ```json { "pipeline": { "processors": [ { "processor_type": { "field": "field_name", "parameter": "value" } } ] }, "docs": [ { "_source": { "field1": "value1" }, "_index": "index_name" } ] } ``` ## 示例 ### 创建管道 ```bash PUT /_ingest/pipeline/my_pipeline { "description": "处理日志数据", "processors": [ { "lowercase": { "field": "message" } }, { "remove": { "field": "temp_field" } } ] } ``` **响应:** ```json { "acknowledged": true } ``` ### 创建带错误处理的管道 ```bash PUT /_ingest/pipeline/secure_pipeline { "processors": [ { "date": { "field": "timestamp", "formats": ["ISO8601"], "target_field": "@timestamp" } } ], "on_failure": [ { "set": { "field": "error", "value": "Failed to parse timestamp" } } ] } ``` ### 获取所有管道 ```bash GET /_ingest/pipeline ``` **响应示例:** ```json { "pipelines": { "my_pipeline": { "description": "处理日志数据", "version": 1, "processors": [...] } } } ``` ### 获取特定管道 ```bash GET /_ingest/pipeline/my_pipeline ``` ### 使用管道索引文档 ```bash POST /my_index/_doc?pipeline=my_pipeline { "message": "HELLO WORLD", "timestamp": "2026-02-04T10:00:00Z" } ``` ### 模拟管道 ```bash POST /_ingest/pipeline/my_pipeline/_simulate { "docs": [ { "_source": { "message": "HELLO WORLD" } } ] } ``` **响应示例:** ```json { "docs": [ { "doc": { "_index": "_index", "_id": "_id", "_source": { "message": "hello world" } } } ] } ``` ### 详细模式模拟 ```bash POST /_ingest/pipeline/my_pipeline/_simulate?verbose=true { "docs": [ { "_source": { "message": "TEST" } } ] } ``` ### 内联管道模拟 ```bash POST /_ingest/pipeline/_simulate { "pipeline": { "processors": [ { "uppercase": { "field": "message" } } ] }, "docs": [ { "_source": { "message": "hello" } } ] } ``` ### 删除管道 ```bash DELETE /_ingest/pipeline/my_pipeline ``` **响应:** ```json { "acknowledged": true } ``` ## 常用处理器 ### 字符串处理 ```json { "processors": [ { "lowercase": { "field": "message" } }, { "uppercase": { "field": "message" } }, { "trim": { "field": "message" } } ] } ``` ### 字段操作 ```json { "processors": [ { "set": { "field": "new_field", "value": "static_value" } }, { "rename": { "field": "old_name", "target_field": "new_name" } }, { "remove": { "field": "temp_field" } } ] } ``` ### 日期处理 ```json { "processors": [ { "date": { "field": "timestamp", "formats": ["ISO8601", "yyyy-MM-dd HH:mm:ss"], "target_field": "@timestamp" } } ] } ``` ### JSON 解析 ```json { "processors": [ { "json": { "field": "json_string", "target_field": "parsed_json" } } ] } ``` ### Grok 解析 ```json { "processors": [ { "grok": { "field": "message", "patterns": ["%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"], "pattern_definitions": { "TIMESTAMP_ISO8601": "YYYY-MM-dd HH:mm:ss" } } } ] } ``` ### 条件处理 ```json { "processors": [ { "set": { "field": "env", "value": "production", "if": "ctx.network?.ip?.startsWith('192.168')" } } ] } ``` ### 计算字段 ```json { "processors": [ { "script": { "source": "ctx.total = ctx.price * ctx.quantity" } } ] } ``` ## 使用场景 ### 场景 1:日志解析 ```bash PUT /_ingest/pipeline/logs_pipeline { "description": "解析 Apache 访问日志", "processors": [ { "grok": { "field": "message", "patterns": ["%{COMMONAPACHELOG}"] } }, { "date": { "field": "timestamp", "formats": ["dd/MMM/yyyy:HH:mm:ss Z"] } }, { "convert": { "field": "response", "type": "integer" } } ] } ``` ### 场景 2:用户数据处理 ```bash PUT /_ingest/pipeline/user_pipeline { "description": "处理用户数据", "processors": [ { "lowercase": { "field": "email" } }, { "set": { "field": "created_at", "value": "{{_ingest.timestamp}}" } }, { "remove": { "field": ["password", "ssn"] } } ] } ``` ### 场景 3:IoT 数据处理 ```bash PUT /_ingest/pipeline/iot_pipeline { "description": "处理 IoT 传感器数据", "processors": [ { "json": { "field": "data", "target_field": "sensor_data" } }, { "convert": { "field": "sensor_data.temperature", "type": "float" } }, { "script": { "source": """ if (ctx.sensor_data.temperature > 30) { ctx.alert = 'High temperature'; } """ } } ] } ``` ### 场景 4:API 响应处理 ```bash PUT /_ingest/pipeline/api_pipeline { "description": "处理 API 响应", "processors": [ { "json": { "field": "response_body", "target_field": "parsed_response" } }, { "dot_expander": { "field": "parsed_response" } }, { "remove": { "field": "response_body" } } ] } ``` ## 管道性能优化 ### 1. 减少处理器数量 ```bash # 不好的做法:多个类似处理器 { "processors": [ {"lowercase": {"field": "field1"}}, {"lowercase": {"field": "field2"}}, {"lowercase": {"field": "field3"}} ] } # 好的做法:合并处理 { "processors": [ { "script": { "source": "['field1', 'field2', 'field3'].forEach(f -> ctx[f] = ctx[f]?.toLowerCase())" } } ] } ``` ### 2. 条件跳过 ```bash { "processors": [ { "set": { "field": "processed", "value": true, "if": "ctx.message != null" } } ] } ``` ### 3. 提前终止 ```bash { "processors": [ { "drop": { "if": "ctx.level == 'DEBUG'" } }, { "date": { "field": "timestamp", "formats": ["ISO8601"] } } ] } ``` ## 注意事项 1. **性能影响**:处理器会增加索引延迟 2. **错误处理**:配置 on_failure 处理异常 3. **版本管理**:使用 version 字段跟踪管道版本 4. **测试验证**:使用 _simulate 在生产前测试 5. **权限控制**:管道脚本需要相应权限 6. **API 密钥安全**:text_embedding 处理器中的 API 密钥会自动加密 ## 常见错误 ### 管道不存在 ```json { "error": { "type": "pipeline_processing_exception", "reason": "pipeline with id [my_pipeline] does not exist" } } ``` ### 处理器配置错误 ```json { "error": { "type": "illegal_argument_exception", "reason": "processor requires a 'field' or 'fields' option" } } ``` ### 字段不存在 ```bash # 使用 ignore_missing 忽略缺失字段 { "processors": [ { "uppercase": { "field": "optional_field", "ignore_missing": true } } ] } ``` ## 最佳实践 1. **命名规范**:使用描述性管道 ID,如 `logs-parse`、`user-enrich` 2. **描述文档**:添加 description 说明管道用途 3. **版本控制**:使用 version 字段管理变更 4. **充分测试**:使用 _simulate 验证处理逻辑 5. **错误处理**:配置 on_failure 处理异常情况 6. **性能监控**:监控管道处理时间