--- title: "批量操作" date: 2026-01-28 lastmod: 2026-01-28 description: "在单个请求中执行多个文档操作,提高批量处理效率" tags: ["文档操作", "批量操作", "数据导入"] summary: "在单个请求中执行多个文档操作(索引、创建、更新、删除),减少网络往返次数。 API # POST /_bulk PUT /_bulk POST /{index}/_bulk PUT /{index}/_bulk API 的作用 # Bulk API 允许在单个请求中执行多个文档操作,显著提高批量数据处理的效率。 支持的操作类型 # 操作 描述 index 索引文档,如果存在则覆盖 create 创建文档,如果存在则失败 update 部分更新文档 delete 删除文档 批量操作的优势 # 减少网络开销:单次请求完成多个操作 提高吞吐量:批量处理比单个操作更快 原子性:每个操作独立执行,互不影响 API 的参数 # 路由参数 # 参数 类型 是否必填 描述 {index} 字符串 否 索引名称 Query String 参数 # 参数 类型 是否必填 默认值 描述 timeout 时间值 否 1m 操作超时时间 refresh 字符串 否 false 刷新策略:true、false、wait_for routing 字符串 否 - 默认路由值 pipeline 字符串 否 - 默认管道名称 require_alias 布尔值 否 false 是否要求使用别名 wait_for_active_shards 字符串 否 1 等待活跃分片数:1、all、具体数字 请求体格式 # 批量请求使用特殊的 NDJSON(换行分隔的 JSON)格式:" --- 在单个请求中执行多个文档操作(索引、创建、更新、删除),减少网络往返次数。 ## API ``` POST /_bulk PUT /_bulk POST /{index}/_bulk PUT /{index}/_bulk ``` ## API 的作用 Bulk API 允许在单个请求中执行多个文档操作,显著提高批量数据处理的效率。 ### 支持的操作类型 | 操作 | 描述 | |------|------| | `index` | 索引文档,如果存在则覆盖 | | `create` | 创建文档,如果存在则失败 | | `update` | 部分更新文档 | | `delete` | 删除文档 | ### 批量操作的优势 - **减少网络开销**:单次请求完成多个操作 - **提高吞吐量**:批量处理比单个操作更快 - **原子性**:每个操作独立执行,互不影响 ## API 的参数 ### 路由参数 | 参数 | 类型 | 是否必填 | 描述 | |------|------|----------|------| | `{index}` | 字符串 | 否 | 索引名称 | ### Query String 参数 | 参数 | 类型 | 是否必填 | 默认值 | 描述 | |------|------|----------|--------|------| | `timeout` | 时间值 | 否 | 1m | 操作超时时间 | | `refresh` | 字符串 | 否 | false | 刷新策略:`true`、`false`、`wait_for` | | `routing` | 字符串 | 否 | - | 默认路由值 | | `pipeline` | 字符串 | 否 | - | 默认管道名称 | | `require_alias` | 布尔值 | 否 | false | 是否要求使用别名 | | `wait_for_active_shards` | 字符串 | 否 | 1 | 等待活跃分片数:`1`、`all`、具体数字 | ### 请求体格式 批量请求使用特殊的 **NDJSON**(换行分隔的 JSON)格式: ``` { 操作元数据 } { 文档数据 } { 操作元数据 } { 文档数据 } ... ``` **重要**: - 每个操作由两行组成:操作指令 + 文档数据 - 行与行之间用换行符 `\n` 分隔 - `delete` 操作只有一行(不需要文档数据) - 末尾必须有换行符 ## 示例 ### 基本批量操作 ```bash POST /_bulk { "index": { "_index": "my_index", "_id": "1" } } { "field1": "value1", "field2": "value2" } { "index": { "_index": "my_index", "_id": "2" } } { "field1": "value3", "field2": "value4" } ``` ### 创建操作 ```bash POST /my_index/_bulk { "create": { "_id": "1" } } { "user": "kimchy", "message": "Hello" } { "create": { "_id": "2" } } { "user": "john", "message": "World" } ``` ### 更新操作 ```bash POST /my_index/_bulk { "update": { "_id": "1" } } { "doc": { "status": "processed" } } { "update": { "_id": "2" } } { "doc": { "counter": 5 } } ``` ### 删除操作 ```bash POST /my_index/_bulk { "delete": { "_index": "my_index", "_id": "1" } } { "delete": { "_index": "my_index", "_id": "2" } } ``` ### 混合操作 ```bash POST /_bulk { "index": { "_index": "users", "_id": "1" } } { "name": "Alice", "age": 30 } { "create": { "_index": "users", "_id": "2" } } { "name": "Bob", "age": 25 } { "update": { "_index": "users", "_id": "1" } } { "doc": { "age": 31 } } { "delete": { "_index": "users", "_id": "2" } } ``` **响应示例:** ```json { "took": 30, "errors": false, "items": [ { "index": { "_index": "users", "_type": "_doc", "_id": "1", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 201 } }, { "create": { "_index": "users", "_type": "_doc", "_id": "2", "_version": 1, "result": "created", "_shards": { ... }, "status": 201 } }, { "update": { "_index": "users", "_type": "_doc", "_id": "1", "_version": 2, "result": "updated", "_shards": { ... }, "status": 200 } }, { "delete": { "_index": "users", "_type": "_doc", "_id": "2", "_version": 2, "result": "deleted", "_shards": { ... }, "status": 200 } } ] } ``` ### 使用管道 ```bash POST /my_index/_bulk?pipeline=timestamp_pipeline { "index": { } } { "message": "Hello without timestamp" } ``` ### 设置超时 ```bash POST /my_index/_bulk?timeout=5m { "index": { "_id": "1" } } { "field": "value" } ``` ### 立即刷新 ```bash POST /my_index/_bulk?refresh=true { "index": { "_id": "1" } } { "field": "value" } ``` ### 使用路由 ```bash POST /my_index/_bulk?routing=user1 { "index": { "_id": "1", "routing": "user1" } } { "field": "value" } ``` ### 脚本更新 ```bash POST /my_index/_bulk { "update": { "_id": "1" } } { "script": { "source": "ctx._source.counter += params.count", "lang": "painless", "params": { "count": 1 } } } { "update": { "_id": "2" } } { "script": { "source": "ctx._source.tags.add(params.tag)", "lang": "painless", "params": { "tag": "important" } } } ``` ### 使用 upsert ```bash POST /my_index/_bulk { "update": { "_id": "1", "_retry_on_conflict": 3 } } { "doc": { "counter": 1 }, "upsert": { "counter": 0 } } ``` ### 条件操作 ```bash POST /my_index/_bulk { "index": { "_id": "1", "if_seq_no": 5, "if_primary_term": 1 } } { "field": "value" } ``` ## 操作指令参数 | 参数 | 类型 | 描述 | |------|------|------| | `_index` | 字符串 | 目标索引 | | `_id` | 字符串 | 文档 ID | | `_type` | 字符串 | 文档类型(已弃用,默认 `_doc`) | | `routing` | 字符串 | 路由值 | | `version` | 整数 | 版本号 | | `version_type` | 字符串 | 版本类型:`internal`、`external`、`external_gte` | | `if_seq_no` | 整数 | 序列号条件 | | `if_primary_term` | 整数 | 主版本条件 | | `retry_on_conflict` | 整数 | 更新操作的重试次数(默认 0) | | `pipeline` | 字符串 | 管道名称 | | `_source` | 对象/布尔值 | 控制返回的源字段 | ## 响应字段说明 | 字段 | 描述 | |------|------| | `took` | 执行时间(毫秒) | | `errors` | 是否有错误 | | `items` | 每个操作的结果数组 | ## 最佳实践 ### 批次大小 | 数据量 | 推荐大小 | 说明 | |--------|----------|------| | 小文档 | 1000-5000 | 5-10MB 左右 | | 中文档 | 500-1000 | 10-15MB 左右 | | 大文档 | 100-500 | 根据文档大小调整 | ### 性能优化 ```bash # 批量导入时禁用刷新 POST /my_index/_bulk?refresh=false { "index": { } } { "field": "value" } ... # 导入完成后手动刷新 POST /my_index/_refresh ``` ### 错误处理 即使部分操作失败,其他操作仍会继续执行: ```json { "took": 15, "errors": true, "items": [ { "index": { "error": { "type": "version_conflict_engine_exception", "reason": "version conflict" }, "status": 409 } }, { "index": { "_index": "my_index", "_id": "2", "result": "created", "status": 201 } } ] } ``` ## 使用场景 ### 场景 1:批量导入数据 ```bash POST /logs/_bulk { "index": { } } { "@timestamp": "2026-02-04", "level": "INFO", "message": "App started" } { "index": { } } { "@timestamp": "2026-02-04", "level": "DEBUG", "message": "Processing..." } ``` ### 场景 2:数据迁移 ```bash POST /new_index/_bulk { "index": { "_id": "1" } } { "field1": "value1", "field2": "value2" } { "index": { "_id": "2" } } { "field1": "value3", "field2": "value4" } ``` ### 场景 3:定期更新 ```bash POST /metrics/_bulk { "update": { "_id": "counter1" } } { "script": "ctx._source.value += params.increment", "params": { "increment": 1 } } { "update": { "_id": "counter2" } } { "script": "ctx._source.value += params.increment", "params": { "increment": 5 } } ``` ## 注意事项 1. **格式要求**:必须使用 NDJSON(换行分隔的 JSON)格式 2. **大小限制**:默认批量请求大小限制约 100MB 3. **内存消耗**:大批量操作会占用较多内存 4. **错误处理**:单个操作失败不影响其他操作 5. **类型已弃用**:`_type` 参数已被弃用,默认使用 `_doc` 6. **换行符**:请求体末尾必须有换行符