在单个请求中执行多个文档操作(索引、创建、更新、删除),减少网络往返次数。
API #
POST /_bulk
PUT /_bulk
POST /{index}/_bulk
PUT /{index}/_bulk
API 的作用 #
Bulk API 允许在单个请求中执行多个文档操作,显著提高批量数据处理的效率。
支持的操作类型 #
| 操作 | 描述 |
|---|---|
index | 索引文档,如果存在则覆盖 |
create | 创建文档,如果存在则失败 |
update | 部分更新文档 |
delete | 删除文档 |
批量操作的优势 #
- 减少网络开销:单次请求完成多个操作
- 提高吞吐量:批量处理比单个操作更快
- 原子性:每个操作独立执行,互不影响
API 的参数 #
路由参数 #
| 参数 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
{index} | 字符串 | 否 | 索引名称 |
Query String 参数 #
| 参数 | 类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
timeout | 时间值 | 否 | 1m | 操作超时时间 |
refresh | 字符串 | 否 | false | 刷新策略:true、false、wait_for |
routing | 字符串 | 否 | - | 默认路由值 |
pipeline | 字符串 | 否 | - | 默认管道名称 |
require_alias | 布尔值 | 否 | false | 是否要求使用别名 |
wait_for_active_shards | 字符串 | 否 | 1 | 等待活跃分片数:1、all、具体数字 |
请求体格式 #
批量请求使用特殊的 NDJSON(换行分隔的 JSON)格式:
{ 操作元数据 }
{ 文档数据 }
{ 操作元数据 }
{ 文档数据 }
...
重要:
- 每个操作由两行组成:操作指令 + 文档数据
- 行与行之间用换行符
\n分隔 delete操作只有一行(不需要文档数据)- 末尾必须有换行符
示例 #
基本批量操作 #
POST /_bulk
{ "index": { "_index": "my_index", "_id": "1" } }
{ "field1": "value1", "field2": "value2" }
{ "index": { "_index": "my_index", "_id": "2" } }
{ "field1": "value3", "field2": "value4" }
创建操作 #
POST /my_index/_bulk
{ "create": { "_id": "1" } }
{ "user": "kimchy", "message": "Hello" }
{ "create": { "_id": "2" } }
{ "user": "john", "message": "World" }
更新操作 #
POST /my_index/_bulk
{ "update": { "_id": "1" } }
{ "doc": { "status": "processed" } }
{ "update": { "_id": "2" } }
{ "doc": { "counter": 5 } }
删除操作 #
POST /my_index/_bulk
{ "delete": { "_index": "my_index", "_id": "1" } }
{ "delete": { "_index": "my_index", "_id": "2" } }
混合操作 #
POST /_bulk
{ "index": { "_index": "users", "_id": "1" } }
{ "name": "Alice", "age": 30 }
{ "create": { "_index": "users", "_id": "2" } }
{ "name": "Bob", "age": 25 }
{ "update": { "_index": "users", "_id": "1" } }
{ "doc": { "age": 31 } }
{ "delete": { "_index": "users", "_id": "2" } }
响应示例:
{
"took": 30,
"errors": false,
"items": [
{
"index": {
"_index": "users",
"_type": "_doc",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 201
}
},
{
"create": {
"_index": "users",
"_type": "_doc",
"_id": "2",
"_version": 1,
"result": "created",
"_shards": { ... },
"status": 201
}
},
{
"update": {
"_index": "users",
"_type": "_doc",
"_id": "1",
"_version": 2,
"result": "updated",
"_shards": { ... },
"status": 200
}
},
{
"delete": {
"_index": "users",
"_type": "_doc",
"_id": "2",
"_version": 2,
"result": "deleted",
"_shards": { ... },
"status": 200
}
}
]
}
使用管道 #
POST /my_index/_bulk?pipeline=timestamp_pipeline
{ "index": { } }
{ "message": "Hello without timestamp" }
设置超时 #
POST /my_index/_bulk?timeout=5m
{ "index": { "_id": "1" } }
{ "field": "value" }
立即刷新 #
POST /my_index/_bulk?refresh=true
{ "index": { "_id": "1" } }
{ "field": "value" }
使用路由 #
POST /my_index/_bulk?routing=user1
{ "index": { "_id": "1", "routing": "user1" } }
{ "field": "value" }
脚本更新 #
POST /my_index/_bulk
{ "update": { "_id": "1" } }
{ "script": { "source": "ctx._source.counter += params.count", "lang": "painless", "params": { "count": 1 } } }
{ "update": { "_id": "2" } }
{ "script": { "source": "ctx._source.tags.add(params.tag)", "lang": "painless", "params": { "tag": "important" } } }
使用 upsert #
POST /my_index/_bulk
{ "update": { "_id": "1", "_retry_on_conflict": 3 } }
{ "doc": { "counter": 1 }, "upsert": { "counter": 0 } }
条件操作 #
POST /my_index/_bulk
{ "index": { "_id": "1", "if_seq_no": 5, "if_primary_term": 1 } }
{ "field": "value" }
操作指令参数 #
| 参数 | 类型 | 描述 |
|---|---|---|
_index | 字符串 | 目标索引 |
_id | 字符串 | 文档 ID |
_type | 字符串 | 文档类型(已弃用,默认 _doc) |
routing | 字符串 | 路由值 |
version | 整数 | 版本号 |
version_type | 字符串 | 版本类型:internal、external、external_gte |
if_seq_no | 整数 | 序列号条件 |
if_primary_term | 整数 | 主版本条件 |
retry_on_conflict | 整数 | 更新操作的重试次数(默认 0) |
pipeline | 字符串 | 管道名称 |
_source | 对象/布尔值 | 控制返回的源字段 |
响应字段说明 #
| 字段 | 描述 |
|---|---|
took | 执行时间(毫秒) |
errors | 是否有错误 |
items | 每个操作的结果数组 |
最佳实践 #
批次大小 #
| 数据量 | 推荐大小 | 说明 |
|---|---|---|
| 小文档 | 1000-5000 | 5-10MB 左右 |
| 中文档 | 500-1000 | 10-15MB 左右 |
| 大文档 | 100-500 | 根据文档大小调整 |
性能优化 #
# 批量导入时禁用刷新
POST /my_index/_bulk?refresh=false
{ "index": { } }
{ "field": "value" }
...
# 导入完成后手动刷新
POST /my_index/_refresh
错误处理 #
即使部分操作失败,其他操作仍会继续执行:
{
"took": 15,
"errors": true,
"items": [
{
"index": {
"error": {
"type": "version_conflict_engine_exception",
"reason": "version conflict"
},
"status": 409
}
},
{
"index": {
"_index": "my_index",
"_id": "2",
"result": "created",
"status": 201
}
}
]
}
使用场景 #
场景 1:批量导入数据 #
POST /logs/_bulk
{ "index": { } }
{ "@timestamp": "2026-02-04", "level": "INFO", "message": "App started" }
{ "index": { } }
{ "@timestamp": "2026-02-04", "level": "DEBUG", "message": "Processing..." }
场景 2:数据迁移 #
POST /new_index/_bulk
{ "index": { "_id": "1" } }
{ "field1": "value1", "field2": "value2" }
{ "index": { "_id": "2" } }
{ "field1": "value3", "field2": "value4" }
场景 3:定期更新 #
POST /metrics/_bulk
{ "update": { "_id": "counter1" } }
{ "script": "ctx._source.value += params.increment", "params": { "increment": 1 } }
{ "update": { "_id": "counter2" } }
{ "script": "ctx._source.value += params.increment", "params": { "increment": 5 } }
注意事项 #
- 格式要求:必须使用 NDJSON(换行分隔的 JSON)格式
- 大小限制:默认批量请求大小限制约 100MB
- 内存消耗:大批量操作会占用较多内存
- 错误处理:单个操作失败不影响其他操作
- 类型已弃用:
_type参数已被弃用,默认使用_doc - 换行符:请求体末尾必须有换行符





