📣 极限科技诚招搜索运维工程师(Elasticsearch/Easysearch)- 全职/北京 👉 : 立即申请加入

摄入管道(Ingest Pipeline)是 Easysearch 中用于在文档索引之前预处理文档的机制。通过定义一系列处理器(processors),可以在数据被索引之前对其进行转换、丰富和规范化。

API #

PUT /_ingest/pipeline/{id}

API 的作用 #

创建新的摄入管道或更新已存在的管道。管道的主要功能包括:

  • 数据转换:重命名字段、修改字段值、提取数据等
  • 数据丰富:添加外部数据、查找关联信息等
  • 数据规范化:格式转换、类型转换、数据清洗等
  • AI 处理:文本嵌入、智能分析等(Easysearch 特有功能)

管道在文档被索引之前自动执行,无需修改客户端代码。

API 的参数 #

路由参数 #

参数类型是否必需描述
idstring必需管道的唯一标识符

查询字符串参数 #

参数类型是否必需默认值描述
master_timeouttime可选30s等待主节点连接的超时时间
timeouttime可选30s等待操作完成的总超时时间
if_versioninteger可选-仅当管道的当前版本匹配指定值时才更新(乐观锁)
prettyboolean可选-格式化 JSON 输出

请求体参数 #

参数类型是否必需描述
descriptionstring可选管道的描述信息
processorsarray必需处理器数组,定义处理文档的一系列操作
on_failurearray可选处理失败时的处理器数组

请求体结构 #

{
  "description": "管道描述",
  "processors": [
    {
      "processor_type": {
        "parameter1": "value1",
        "parameter2": "value2"
      }
    }
  ],
  "on_failure": [
    {
      "processor_type": {
        "parameter1": "value1"
      }
    }
  ]
}

常用处理器类型 #

基础处理器 #

处理器功能
set设置字段值
remove删除字段
rename重命名字段
drop丢弃文档
convert转换字段数据类型

内容处理器 #

处理器功能
grok使用正则表达式解析文本
dissect使用分隔符解析文本
csv解析 CSV 数据
json解析 JSON 字符串

AI 处理器(Easysearch 特有) #

处理器功能
text_embedding将文本转换为向量嵌入

请求示例 #

基础管道 #

PUT /_ingest/pipeline/my-pipeline
{
  "description": "处理日志数据",
  "processors": [
    {
      "set": {
        "field": "processed_at",
        "value": "{{_ingest.timestamp}}"
      }
    },
    {
      "remove": {
        "field": "temp_field"
      }
    }
  ]
}

Grok 解析管道 #

PUT /_ingest/pipeline/apache-logs
{
  "description": "解析 Apache 日志",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{COMMONAPACHELOG}"
        ]
      }
    }
  ]
}

文本嵌入管道(AI 功能) #

PUT /_ingest/pipeline/embedding-pipeline
{
  "description": "生成文本嵌入向量",
  "processors": [
    {
      "text_embedding": {
        "text_field": "content",
        "vector_field": "embedding",
        "model_id": "text-embedding-ada-002",
        "api_key": "your-api-key"
      }
    }
  ]
}

重要说明text_embedding 处理器中的 api_key 会被自动加密存储,格式为 ENCRYPTED_VALUE...infinilabs

带错误处理的管道 #

PUT /_ingest/pipeline/safe-pipeline
{
  "description": "带错误处理的管道",
  "processors": [
    {
      "convert": {
        "field": "price",
        "type": "float",
        "ignore_failure": true
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "error_message",
        "value": "{{ _ingest.on_failure_message }}"
      }
    }
  ]
}

响应示例 #

成功响应 #

{
  "acknowledged": true,
  "version": 2
}

错误响应 - 管道定义无效 #

{
  "error": {
    "root_cause": [
      {
        "type": "illegal_argument_exception",
        "reason": "Processor type [unknown_processor] is not installed"
      }
    ],
    "type": "illegal_argument_exception",
    "reason": "Processor type [unknown_processor] is not installed"
  },
  "status": 400
}

使用管道 #

创建管道后,可以通过以下方式使用:

方式 1:索引时指定管道 #

POST /my-index/_doc?pipeline=my-pipeline
{
  "field1": "value1",
  "field2": "value2"
}

方式 2:在索引设置中指定默认管道 #

PUT /my-index
{
  "settings": {
    "index.default_pipeline": "my-pipeline"
  }
}

处理器执行顺序 #

处理器按照数组中的顺序依次执行:

  1. 如果某个处理器失败且设置了 ignore_failure: true,则跳过该处理器继续执行
  2. 如果处理器失败且未设置 ignore_failure,则执行 on_failure 中的处理器
  3. 如果 on_failure 也失败,文档将被拒绝

注意事项 #

  1. API 密钥加密text_embedding 处理器中的 API 密钥会自动加密存储
  2. 管道版本:每次更新管道会增加版本号,可用于乐观锁控制
  3. 性能影响:复杂的管道会影响索引性能,建议合理设计
  4. 测试管道:使用 POST /_ingest/pipeline/{id}/_simulate 模拟执行,验证管道逻辑
  5. 必需的 processors:管道必须至少包含一个处理器

相关文档 #