📣 极限科技诚招搜索运维工程师(Elasticsearch/Easysearch)- 全职/北京 👉 : 立即申请加入

处理管道(Ingest Pipeline)用于在文档被索引之前对其进行预处理和转换。

API #

创建或更新管道 #

PUT /_ingest/pipeline/{id}

获取管道 #

GET /_ingest/pipeline
GET /_ingest/pipeline/{id}

删除管道 #

DELETE /_ingest/pipeline/{id}

模拟管道 #

POST /_ingest/pipeline/{id}/_simulate
POST /_ingest/pipeline/_simulate

API 的作用 #

处理管道定义了一系列处理器(processors),按顺序对文档进行转换。

管道的用途 #

用途描述
数据清洗去除不需要的字段、格式化数据
数据丰富添加计算字段、外部数据
数据转换重命名字段、类型转换
数据提取从文本中提取结构化数据

处理流程 #

原始文档 → 处理器1 → 处理器2 → ... → 处理器N → 处理后文档 → 索引

API 的参数 #

PUT /_ingest/pipeline/{id} #

路由参数 #

参数类型是否必填描述
{id}字符串必需管道 ID

Query String 参数 #

参数类型是否必填默认值描述
master_timeout时间值30s等待主节点响应的超时时间
timeout时间值30s等待确认的超时时间

请求体参数 #

{
  "description": "管道描述",
  "version": 1,
  "processors": [
    {
      "processor_type": {
        "field": "field_name",
        "parameter": "value"
      }
    }
  ],
  "on_failure": [
    {
      "processor_type": {
        "parameter": "value"
      }
    }
  ]
}
参数类型是否必填描述
description字符串管道描述
version整数管道版本号
processors数组必需处理器数组
on_failure数组失败时执行的处理器

GET /_ingest/pipeline #

Query String 参数 #

参数类型是否必填默认值描述
master_timeout时间值30s等待主节点响应的超时时间

DELETE /_ingest/pipeline/{id} #

Query String 参数 #

参数类型是否必填默认值描述
master_timeout时间值30s等待主节点响应的超时时间
timeout时间值30s等待确认的超时时间

POST /_ingest/pipeline/{id}/_simulate #

Query String 参数 #

参数类型是否必填默认值描述
verbose布尔值false是否返回详细输出

请求体参数 #

模拟指定管道:

{
  "docs": [
    {
      "_source": {
        "field1": "value1",
        "field2": "value2"
      },
      "_index": "index_name",
      "_id": "doc_id"
    }
  ]
}

内联模拟:

{
  "pipeline": {
    "processors": [
      {
        "processor_type": {
          "field": "field_name",
          "parameter": "value"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "field1": "value1"
      },
      "_index": "index_name"
    }
  ]
}

示例 #

创建管道 #

PUT /_ingest/pipeline/my_pipeline
{
  "description": "处理日志数据",
  "processors": [
    {
      "lowercase": {
        "field": "message"
      }
    },
    {
      "remove": {
        "field": "temp_field"
      }
    }
  ]
}

响应:

{
  "acknowledged": true
}

创建带错误处理的管道 #

PUT /_ingest/pipeline/secure_pipeline
{
  "processors": [
    {
      "date": {
        "field": "timestamp",
        "formats": ["ISO8601"],
        "target_field": "@timestamp"
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "error",
        "value": "Failed to parse timestamp"
      }
    }
  ]
}

获取所有管道 #

GET /_ingest/pipeline

响应示例:

{
  "pipelines": {
    "my_pipeline": {
      "description": "处理日志数据",
      "version": 1,
      "processors": [...]
    }
  }
}

获取特定管道 #

GET /_ingest/pipeline/my_pipeline

使用管道索引文档 #

POST /my_index/_doc?pipeline=my_pipeline
{
  "message": "HELLO WORLD",
  "timestamp": "2026-02-04T10:00:00Z"
}

模拟管道 #

POST /_ingest/pipeline/my_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "HELLO WORLD"
      }
    }
  ]
}

响应示例:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "message": "hello world"
        }
      }
    }
  ]
}

详细模式模拟 #

POST /_ingest/pipeline/my_pipeline/_simulate?verbose=true
{
  "docs": [
    {
      "_source": {
        "message": "TEST"
      }
    }
  ]
}

内联管道模拟 #

POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "uppercase": {
          "field": "message"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "hello"
      }
    }
  ]
}

删除管道 #

DELETE /_ingest/pipeline/my_pipeline

响应:

{
  "acknowledged": true
}

常用处理器 #

字符串处理 #

{
  "processors": [
    {
      "lowercase": {
        "field": "message"
      }
    },
    {
      "uppercase": {
        "field": "message"
      }
    },
    {
      "trim": {
        "field": "message"
      }
    }
  ]
}

字段操作 #

{
  "processors": [
    {
      "set": {
        "field": "new_field",
        "value": "static_value"
      }
    },
    {
      "rename": {
        "field": "old_name",
        "target_field": "new_name"
      }
    },
    {
      "remove": {
        "field": "temp_field"
      }
    }
  ]
}

日期处理 #

{
  "processors": [
    {
      "date": {
        "field": "timestamp",
        "formats": ["ISO8601", "yyyy-MM-dd HH:mm:ss"],
        "target_field": "@timestamp"
      }
    }
  ]
}

JSON 解析 #

{
  "processors": [
    {
      "json": {
        "field": "json_string",
        "target_field": "parsed_json"
      }
    }
  ]
}

Grok 解析 #

{
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"],
        "pattern_definitions": {
          "TIMESTAMP_ISO8601": "YYYY-MM-dd HH:mm:ss"
        }
      }
    }
  ]
}

条件处理 #

{
  "processors": [
    {
      "set": {
        "field": "env",
        "value": "production",
        "if": "ctx.network?.ip?.startsWith('192.168')"
      }
    }
  ]
}

计算字段 #

{
  "processors": [
    {
      "script": {
        "source": "ctx.total = ctx.price * ctx.quantity"
      }
    }
  ]
}

使用场景 #

场景 1:日志解析 #

PUT /_ingest/pipeline/logs_pipeline
{
  "description": "解析 Apache 访问日志",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{COMMONAPACHELOG}"]
      }
    },
    {
      "date": {
        "field": "timestamp",
        "formats": ["dd/MMM/yyyy:HH:mm:ss Z"]
      }
    },
    {
      "convert": {
        "field": "response",
        "type": "integer"
      }
    }
  ]
}

场景 2:用户数据处理 #

PUT /_ingest/pipeline/user_pipeline
{
  "description": "处理用户数据",
  "processors": [
    {
      "lowercase": {
        "field": "email"
      }
    },
    {
      "set": {
        "field": "created_at",
        "value": "{{_ingest.timestamp}}"
      }
    },
    {
      "remove": {
        "field": ["password", "ssn"]
      }
    }
  ]
}

场景 3:IoT 数据处理 #

PUT /_ingest/pipeline/iot_pipeline
{
  "description": "处理 IoT 传感器数据",
  "processors": [
    {
      "json": {
        "field": "data",
        "target_field": "sensor_data"
      }
    },
    {
      "convert": {
        "field": "sensor_data.temperature",
        "type": "float"
      }
    },
    {
      "script": {
        "source": """
          if (ctx.sensor_data.temperature > 30) {
            ctx.alert = 'High temperature';
          }
        """
      }
    }
  ]
}

场景 4:API 响应处理 #

PUT /_ingest/pipeline/api_pipeline
{
  "description": "处理 API 响应",
  "processors": [
    {
      "json": {
        "field": "response_body",
        "target_field": "parsed_response"
      }
    },
    {
      "dot_expander": {
        "field": "parsed_response"
      }
    },
    {
      "remove": {
        "field": "response_body"
      }
    }
  ]
}

管道性能优化 #

1. 减少处理器数量 #

# 不好的做法:多个类似处理器
{
  "processors": [
    {"lowercase": {"field": "field1"}},
    {"lowercase": {"field": "field2"}},
    {"lowercase": {"field": "field3"}}
  ]
}

# 好的做法:合并处理
{
  "processors": [
    {
      "script": {
        "source": "['field1', 'field2', 'field3'].forEach(f -> ctx[f] = ctx[f]?.toLowerCase())"
      }
    }
  ]
}

2. 条件跳过 #

{
  "processors": [
    {
      "set": {
        "field": "processed",
        "value": true,
        "if": "ctx.message != null"
      }
    }
  ]
}

3. 提前终止 #

{
  "processors": [
    {
      "drop": {
        "if": "ctx.level == 'DEBUG'"
      }
    },
    {
      "date": {
        "field": "timestamp",
        "formats": ["ISO8601"]
      }
    }
  ]
}

注意事项 #

  1. 性能影响:处理器会增加索引延迟
  2. 错误处理:配置 on_failure 处理异常
  3. 版本管理:使用 version 字段跟踪管道版本
  4. 测试验证:使用 _simulate 在生产前测试
  5. 权限控制:管道脚本需要相应权限
  6. API 密钥安全:text_embedding 处理器中的 API 密钥会自动加密

常见错误 #

管道不存在 #

{
  "error": {
    "type": "pipeline_processing_exception",
    "reason": "pipeline with id [my_pipeline] does not exist"
  }
}

处理器配置错误 #

{
  "error": {
    "type": "illegal_argument_exception",
    "reason": "processor requires a 'field' or 'fields' option"
  }
}

字段不存在 #

# 使用 ignore_missing 忽略缺失字段
{
  "processors": [
    {
      "uppercase": {
        "field": "optional_field",
        "ignore_missing": true
      }
    }
  ]
}

最佳实践 #

  1. 命名规范:使用描述性管道 ID,如 logs-parseuser-enrich
  2. 描述文档:添加 description 说明管道用途
  3. 版本控制:使用 version 字段管理变更
  4. 充分测试:使用 _simulate 验证处理逻辑
  5. 错误处理:配置 on_failure 处理异常情况
  6. 性能监控:监控管道处理时间