📣 极限科技诚招搜索运维工程师(Elasticsearch/Easysearch)- 全职/北京 👉 : 立即申请加入

在单个请求中执行多个文档操作(索引、创建、更新、删除),减少网络往返次数。

API #

POST /_bulk
PUT /_bulk
POST /{index}/_bulk
PUT /{index}/_bulk

API 的作用 #

Bulk API 允许在单个请求中执行多个文档操作,显著提高批量数据处理的效率。

支持的操作类型 #

操作描述
index索引文档,如果存在则覆盖
create创建文档,如果存在则失败
update部分更新文档
delete删除文档

批量操作的优势 #

  • 减少网络开销:单次请求完成多个操作
  • 提高吞吐量:批量处理比单个操作更快
  • 原子性:每个操作独立执行,互不影响

API 的参数 #

路由参数 #

参数类型是否必填描述
{index}字符串索引名称

Query String 参数 #

参数类型是否必填默认值描述
timeout时间值1m操作超时时间
refresh字符串false刷新策略:truefalsewait_for
routing字符串-默认路由值
pipeline字符串-默认管道名称
require_alias布尔值false是否要求使用别名
wait_for_active_shards字符串1等待活跃分片数:1all、具体数字

请求体格式 #

批量请求使用特殊的 NDJSON(换行分隔的 JSON)格式:

{ 操作元数据 }
{ 文档数据 }
{ 操作元数据 }
{ 文档数据 }
...

重要

  • 每个操作由两行组成:操作指令 + 文档数据
  • 行与行之间用换行符 \n 分隔
  • delete 操作只有一行(不需要文档数据)
  • 末尾必须有换行符

示例 #

基本批量操作 #

POST /_bulk
{ "index": { "_index": "my_index", "_id": "1" } }
{ "field1": "value1", "field2": "value2" }
{ "index": { "_index": "my_index", "_id": "2" } }
{ "field1": "value3", "field2": "value4" }

创建操作 #

POST /my_index/_bulk
{ "create": { "_id": "1" } }
{ "user": "kimchy", "message": "Hello" }
{ "create": { "_id": "2" } }
{ "user": "john", "message": "World" }

更新操作 #

POST /my_index/_bulk
{ "update": { "_id": "1" } }
{ "doc": { "status": "processed" } }
{ "update": { "_id": "2" } }
{ "doc": { "counter": 5 } }

删除操作 #

POST /my_index/_bulk
{ "delete": { "_index": "my_index", "_id": "1" } }
{ "delete": { "_index": "my_index", "_id": "2" } }

混合操作 #

POST /_bulk
{ "index": { "_index": "users", "_id": "1" } }
{ "name": "Alice", "age": 30 }
{ "create": { "_index": "users", "_id": "2" } }
{ "name": "Bob", "age": 25 }
{ "update": { "_index": "users", "_id": "1" } }
{ "doc": { "age": 31 } }
{ "delete": { "_index": "users", "_id": "2" } }

响应示例:

{
  "took": 30,
  "errors": false,
  "items": [
    {
      "index": {
        "_index": "users",
        "_type": "_doc",
        "_id": "1",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "status": 201
      }
    },
    {
      "create": {
        "_index": "users",
        "_type": "_doc",
        "_id": "2",
        "_version": 1,
        "result": "created",
        "_shards": { ... },
        "status": 201
      }
    },
    {
      "update": {
        "_index": "users",
        "_type": "_doc",
        "_id": "1",
        "_version": 2,
        "result": "updated",
        "_shards": { ... },
        "status": 200
      }
    },
    {
      "delete": {
        "_index": "users",
        "_type": "_doc",
        "_id": "2",
        "_version": 2,
        "result": "deleted",
        "_shards": { ... },
        "status": 200
      }
    }
  ]
}

使用管道 #

POST /my_index/_bulk?pipeline=timestamp_pipeline
{ "index": { } }
{ "message": "Hello without timestamp" }

设置超时 #

POST /my_index/_bulk?timeout=5m
{ "index": { "_id": "1" } }
{ "field": "value" }

立即刷新 #

POST /my_index/_bulk?refresh=true
{ "index": { "_id": "1" } }
{ "field": "value" }

使用路由 #

POST /my_index/_bulk?routing=user1
{ "index": { "_id": "1", "routing": "user1" } }
{ "field": "value" }

脚本更新 #

POST /my_index/_bulk
{ "update": { "_id": "1" } }
{ "script": { "source": "ctx._source.counter += params.count", "lang": "painless", "params": { "count": 1 } } }
{ "update": { "_id": "2" } }
{ "script": { "source": "ctx._source.tags.add(params.tag)", "lang": "painless", "params": { "tag": "important" } } }

使用 upsert #

POST /my_index/_bulk
{ "update": { "_id": "1", "_retry_on_conflict": 3 } }
{ "doc": { "counter": 1 }, "upsert": { "counter": 0 } }

条件操作 #

POST /my_index/_bulk
{ "index": { "_id": "1", "if_seq_no": 5, "if_primary_term": 1 } }
{ "field": "value" }

操作指令参数 #

参数类型描述
_index字符串目标索引
_id字符串文档 ID
_type字符串文档类型(已弃用,默认 _doc
routing字符串路由值
version整数版本号
version_type字符串版本类型:internalexternalexternal_gte
if_seq_no整数序列号条件
if_primary_term整数主版本条件
retry_on_conflict整数更新操作的重试次数(默认 0)
pipeline字符串管道名称
_source对象/布尔值控制返回的源字段

响应字段说明 #

字段描述
took执行时间(毫秒)
errors是否有错误
items每个操作的结果数组

最佳实践 #

批次大小 #

数据量推荐大小说明
小文档1000-50005-10MB 左右
中文档500-100010-15MB 左右
大文档100-500根据文档大小调整

性能优化 #

# 批量导入时禁用刷新
POST /my_index/_bulk?refresh=false
{ "index": { } }
{ "field": "value" }
...

# 导入完成后手动刷新
POST /my_index/_refresh

错误处理 #

即使部分操作失败,其他操作仍会继续执行:

{
  "took": 15,
  "errors": true,
  "items": [
    {
      "index": {
        "error": {
          "type": "version_conflict_engine_exception",
          "reason": "version conflict"
        },
        "status": 409
      }
    },
    {
      "index": {
        "_index": "my_index",
        "_id": "2",
        "result": "created",
        "status": 201
      }
    }
  ]
}

使用场景 #

场景 1:批量导入数据 #

POST /logs/_bulk
{ "index": { } }
{ "@timestamp": "2026-02-04", "level": "INFO", "message": "App started" }
{ "index": { } }
{ "@timestamp": "2026-02-04", "level": "DEBUG", "message": "Processing..." }

场景 2:数据迁移 #

POST /new_index/_bulk
{ "index": { "_id": "1" } }
{ "field1": "value1", "field2": "value2" }
{ "index": { "_id": "2" } }
{ "field1": "value3", "field2": "value4" }

场景 3:定期更新 #

POST /metrics/_bulk
{ "update": { "_id": "counter1" } }
{ "script": "ctx._source.value += params.increment", "params": { "increment": 1 } }
{ "update": { "_id": "counter2" } }
{ "script": "ctx._source.value += params.increment", "params": { "increment": 5 } }

注意事项 #

  1. 格式要求:必须使用 NDJSON(换行分隔的 JSON)格式
  2. 大小限制:默认批量请求大小限制约 100MB
  3. 内存消耗:大批量操作会占用较多内存
  4. 错误处理:单个操作失败不影响其他操作
  5. 类型已弃用_type 参数已被弃用,默认使用 _doc
  6. 换行符:请求体末尾必须有换行符