处理管道(Ingest Pipeline)用于在文档被索引之前对其进行预处理和转换。
API #
创建或更新管道 #
PUT /_ingest/pipeline/{id}
获取管道 #
GET /_ingest/pipeline
GET /_ingest/pipeline/{id}
删除管道 #
DELETE /_ingest/pipeline/{id}
模拟管道 #
POST /_ingest/pipeline/{id}/_simulate
POST /_ingest/pipeline/_simulate
API 的作用 #
处理管道定义了一系列处理器(processors),按顺序对文档进行转换。
管道的用途 #
| 用途 | 描述 |
|---|---|
| 数据清洗 | 去除不需要的字段、格式化数据 |
| 数据丰富 | 添加计算字段、外部数据 |
| 数据转换 | 重命名字段、类型转换 |
| 数据提取 | 从文本中提取结构化数据 |
处理流程 #
原始文档 → 处理器1 → 处理器2 → ... → 处理器N → 处理后文档 → 索引
API 的参数 #
PUT /_ingest/pipeline/{id} #
路由参数 #
| 参数 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
{id} | 字符串 | 必需 | 管道 ID |
Query String 参数 #
| 参数 | 类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
master_timeout | 时间值 | 否 | 30s | 等待主节点响应的超时时间 |
timeout | 时间值 | 否 | 30s | 等待确认的超时时间 |
请求体参数 #
{
"description": "管道描述",
"version": 1,
"processors": [
{
"processor_type": {
"field": "field_name",
"parameter": "value"
}
}
],
"on_failure": [
{
"processor_type": {
"parameter": "value"
}
}
]
}
| 参数 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
description | 字符串 | 否 | 管道描述 |
version | 整数 | 否 | 管道版本号 |
processors | 数组 | 必需 | 处理器数组 |
on_failure | 数组 | 否 | 失败时执行的处理器 |
GET /_ingest/pipeline #
Query String 参数 #
| 参数 | 类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
master_timeout | 时间值 | 否 | 30s | 等待主节点响应的超时时间 |
DELETE /_ingest/pipeline/{id} #
Query String 参数 #
| 参数 | 类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
master_timeout | 时间值 | 否 | 30s | 等待主节点响应的超时时间 |
timeout | 时间值 | 否 | 30s | 等待确认的超时时间 |
POST /_ingest/pipeline/{id}/_simulate #
Query String 参数 #
| 参数 | 类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
verbose | 布尔值 | 否 | false | 是否返回详细输出 |
请求体参数 #
模拟指定管道:
{
"docs": [
{
"_source": {
"field1": "value1",
"field2": "value2"
},
"_index": "index_name",
"_id": "doc_id"
}
]
}
内联模拟:
{
"pipeline": {
"processors": [
{
"processor_type": {
"field": "field_name",
"parameter": "value"
}
}
]
},
"docs": [
{
"_source": {
"field1": "value1"
},
"_index": "index_name"
}
]
}
示例 #
创建管道 #
PUT /_ingest/pipeline/my_pipeline
{
"description": "处理日志数据",
"processors": [
{
"lowercase": {
"field": "message"
}
},
{
"remove": {
"field": "temp_field"
}
}
]
}
响应:
{
"acknowledged": true
}
创建带错误处理的管道 #
PUT /_ingest/pipeline/secure_pipeline
{
"processors": [
{
"date": {
"field": "timestamp",
"formats": ["ISO8601"],
"target_field": "@timestamp"
}
}
],
"on_failure": [
{
"set": {
"field": "error",
"value": "Failed to parse timestamp"
}
}
]
}
获取所有管道 #
GET /_ingest/pipeline
响应示例:
{
"pipelines": {
"my_pipeline": {
"description": "处理日志数据",
"version": 1,
"processors": [...]
}
}
}
获取特定管道 #
GET /_ingest/pipeline/my_pipeline
使用管道索引文档 #
POST /my_index/_doc?pipeline=my_pipeline
{
"message": "HELLO WORLD",
"timestamp": "2026-02-04T10:00:00Z"
}
模拟管道 #
POST /_ingest/pipeline/my_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": "HELLO WORLD"
}
}
]
}
响应示例:
{
"docs": [
{
"doc": {
"_index": "_index",
"_id": "_id",
"_source": {
"message": "hello world"
}
}
}
]
}
详细模式模拟 #
POST /_ingest/pipeline/my_pipeline/_simulate?verbose=true
{
"docs": [
{
"_source": {
"message": "TEST"
}
}
]
}
内联管道模拟 #
POST /_ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"uppercase": {
"field": "message"
}
}
]
},
"docs": [
{
"_source": {
"message": "hello"
}
}
]
}
删除管道 #
DELETE /_ingest/pipeline/my_pipeline
响应:
{
"acknowledged": true
}
常用处理器 #
字符串处理 #
{
"processors": [
{
"lowercase": {
"field": "message"
}
},
{
"uppercase": {
"field": "message"
}
},
{
"trim": {
"field": "message"
}
}
]
}
字段操作 #
{
"processors": [
{
"set": {
"field": "new_field",
"value": "static_value"
}
},
{
"rename": {
"field": "old_name",
"target_field": "new_name"
}
},
{
"remove": {
"field": "temp_field"
}
}
]
}
日期处理 #
{
"processors": [
{
"date": {
"field": "timestamp",
"formats": ["ISO8601", "yyyy-MM-dd HH:mm:ss"],
"target_field": "@timestamp"
}
}
]
}
JSON 解析 #
{
"processors": [
{
"json": {
"field": "json_string",
"target_field": "parsed_json"
}
}
]
}
Grok 解析 #
{
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"],
"pattern_definitions": {
"TIMESTAMP_ISO8601": "YYYY-MM-dd HH:mm:ss"
}
}
}
]
}
条件处理 #
{
"processors": [
{
"set": {
"field": "env",
"value": "production",
"if": "ctx.network?.ip?.startsWith('192.168')"
}
}
]
}
计算字段 #
{
"processors": [
{
"script": {
"source": "ctx.total = ctx.price * ctx.quantity"
}
}
]
}
使用场景 #
场景 1:日志解析 #
PUT /_ingest/pipeline/logs_pipeline
{
"description": "解析 Apache 访问日志",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{COMMONAPACHELOG}"]
}
},
{
"date": {
"field": "timestamp",
"formats": ["dd/MMM/yyyy:HH:mm:ss Z"]
}
},
{
"convert": {
"field": "response",
"type": "integer"
}
}
]
}
场景 2:用户数据处理 #
PUT /_ingest/pipeline/user_pipeline
{
"description": "处理用户数据",
"processors": [
{
"lowercase": {
"field": "email"
}
},
{
"set": {
"field": "created_at",
"value": "{{_ingest.timestamp}}"
}
},
{
"remove": {
"field": ["password", "ssn"]
}
}
]
}
场景 3:IoT 数据处理 #
PUT /_ingest/pipeline/iot_pipeline
{
"description": "处理 IoT 传感器数据",
"processors": [
{
"json": {
"field": "data",
"target_field": "sensor_data"
}
},
{
"convert": {
"field": "sensor_data.temperature",
"type": "float"
}
},
{
"script": {
"source": """
if (ctx.sensor_data.temperature > 30) {
ctx.alert = 'High temperature';
}
"""
}
}
]
}
场景 4:API 响应处理 #
PUT /_ingest/pipeline/api_pipeline
{
"description": "处理 API 响应",
"processors": [
{
"json": {
"field": "response_body",
"target_field": "parsed_response"
}
},
{
"dot_expander": {
"field": "parsed_response"
}
},
{
"remove": {
"field": "response_body"
}
}
]
}
管道性能优化 #
1. 减少处理器数量 #
# 不好的做法:多个类似处理器
{
"processors": [
{"lowercase": {"field": "field1"}},
{"lowercase": {"field": "field2"}},
{"lowercase": {"field": "field3"}}
]
}
# 好的做法:合并处理
{
"processors": [
{
"script": {
"source": "['field1', 'field2', 'field3'].forEach(f -> ctx[f] = ctx[f]?.toLowerCase())"
}
}
]
}
2. 条件跳过 #
{
"processors": [
{
"set": {
"field": "processed",
"value": true,
"if": "ctx.message != null"
}
}
]
}
3. 提前终止 #
{
"processors": [
{
"drop": {
"if": "ctx.level == 'DEBUG'"
}
},
{
"date": {
"field": "timestamp",
"formats": ["ISO8601"]
}
}
]
}
注意事项 #
- 性能影响:处理器会增加索引延迟
- 错误处理:配置 on_failure 处理异常
- 版本管理:使用 version 字段跟踪管道版本
- 测试验证:使用 _simulate 在生产前测试
- 权限控制:管道脚本需要相应权限
- API 密钥安全:text_embedding 处理器中的 API 密钥会自动加密
常见错误 #
管道不存在 #
{
"error": {
"type": "pipeline_processing_exception",
"reason": "pipeline with id [my_pipeline] does not exist"
}
}
处理器配置错误 #
{
"error": {
"type": "illegal_argument_exception",
"reason": "processor requires a 'field' or 'fields' option"
}
}
字段不存在 #
# 使用 ignore_missing 忽略缺失字段
{
"processors": [
{
"uppercase": {
"field": "optional_field",
"ignore_missing": true
}
}
]
}
最佳实践 #
- 命名规范:使用描述性管道 ID,如
logs-parse、user-enrich - 描述文档:添加 description 说明管道用途
- 版本控制:使用 version 字段管理变更
- 充分测试:使用 _simulate 验证处理逻辑
- 错误处理:配置 on_failure 处理异常情况
- 性能监控:监控管道处理时间





