📣 极限科技诚招搜索运维工程师(Elasticsearch/Easysearch)- 全职/北京 👉 : 立即申请加入

在 Easysearch 的世界里,如果你还在使用单条 PUTPOST 请求来循环写入数据,那么你的系统性能可能正遭受着严重的浪费。

想象一下搬家场景:是每次手里拿一个箱子跑一趟(单条 API),还是把所有箱子装进大卡车一次性拉走(Bulk API)?答案显而易见。

Bulk API 是 Easysearch 提供的高效批量操作接口。经过测试,合理使用 Bulk API 可以将写入吞吐量提升 5 到 10 倍。本文将带你深入理解其特殊的格式,并实战演示如何在一个请求中完成增删改查。

1. 核心概念:NDJSON 格式 #

Bulk API 最大的“坑”往往在于它的格式。它不接受标准的 JSON 数组([{...}, {...}]),而是采用一种特殊的格式:NDJSON (Newline Delimited JSON)

规则: #

  1. 每行一个 JSON 对象:不能有换行符美化。
  2. 两行代表一个操作(Delete 除外):
  • 第一行:元数据(Action & Metadata),定义要做什么(index, create, update, delete)以及针对哪个索引、哪个 ID。
  • 第二行:数据体(Source),定义具体的文档内容。
  1. 最后一行必须以换行符(\n)结束

结构示意图: #

{ "index": { "_index": "test", "_id": "1" } } <-- 动作行
{ "field1": "value1" } <-- 数据行
{ "delete": { "_index": "test", "_id": "2" } } <-- 只有动作行

2. 四大操作类型实战 #

Bulk API 支持四种操作类型,它们可以在同一个请求中混合使用。

2.1 Index(索引) #

  • 行为:如果 ID 不存在则创建,如果存在则全量覆盖
  • 场景:最常用的写入方式,适合日志或状态更新。

2.2 Create(创建) #

  • 行为:如果 ID 不存在则创建,如果已存在则报错
  • 场景:确保数据不被覆盖,保证唯一性。

2.3 Update(更新) #

  • 行为:执行局部更新。注意数据体需要包裹在 docscript 中。
  • 场景:修改部分字段。

2.4 Delete(删除) #

  • 行为:删除指定文档。
  • 注意:Delete 操作没有第二行数据体。

3. 综合实战演示 #

假设我们正在管理一个电商库存索引 products,现在需要在一个 HTTP 请求中完成以下操作:

  1. 上架一个新商品(Index)。
  2. 尝试创建另一个商品,但必须保证不覆盖已有 ID(Create)。
  3. 更新 iPhone 15 的价格(Update)。
  4. 下架旧款商品(Delete)。

请求构造:

POST /_bulk
{"index": {"_index": "products", "_id": "1001"}}
{"name": "MacBook Pro", "price": 12999, "stock": 50}
{"create": {"_index": "products", "_id": "1002"}}
{"name": "AirPods Pro", "price": 1999, "stock": 100}
{"update": {"_index": "products", "_id": "1003"}}
{"doc": {"price": 5500}}
{"delete": {"_index": "products", "_id": "9999"}}

(注意:复制上述代码时,请确保最后一行后面有一个换行符)

Easysearch 响应解析:

Bulk API 的响应体也是一个庞大的 JSON,它会返回每一条子操作的执行结果。

{
  "took": 30,
  "errors": false, // 【关键】首先看这里,false 代表全成功
  "items": [
    {
      "index": {
        "_index": "products",
        "_id": "1001",
        "result": "created",
        "status": 201
      }
    },
    {
      "update": {
        "_index": "products",
        "_id": "1003",
        "result": "updated",
        "status": 200
      }
    }
    // ... 其他结果
  ]
}

重要提示:如果 errorstrue,意味着部分操作失败了。Bulk API 并不是事务性的,其中一条失败不会导致整个请求回滚,其他的依然会执行。你需要遍历 items 数组找出 status 不是 2xx 的条目进行重试或记录日志。


4. 性能调优最佳实践 #

掌握了语法只是第一步,如何用好 Bulk API 才是关键。

4.1 批次大小 (Batch Size) #

  • 误区:一次发 1000 万条数据。这会导致内存溢出或请求超时。
  • 建议
  • 按条数:通常 1000 ~ 5000 条为一个批次。
  • 按体积:请求体大小控制在 5MB ~ 15MB 之间。
  • 动态调整:从 5MB 开始测试,如果响应很快且无报错,逐渐增加;如果出现超时或队列拒绝(Queue Rejection),则减小。

4.2 客户端并发 #

单线程发送 Bulk 请求很难打满 Easysearch 的服务器性能。建议使用多线程或异步方式并发发送 Bulk 请求。但要注意观察 Easysearch 的 write 线程池队列,避免压垮服务器。

4.3 去除 Refresh #

在进行大量数据导入(如从 MySQL 初始化数据)时,建议临时将索引的 refresh_interval 设置为 -1

PUT /products/_settings
{ "index": { "refresh_interval": "-1" } }

导入完成后再改回 1s。这可以显著减少磁盘 I/O,写入速度提升 30% 以上。


5. 为什么 Easysearch 的写入更稳? #

虽然 Bulk API 协议是通用的,但在高并发写入场景下,INFINI Easysearch 相比原生 ES 做了大量内核级优化:

  1. 更智能的内存管理:有效防止大批次写入导致的 OOM(内存溢出)。
  2. 写入/查询隔离:优化了线程池调度,防止写入高峰拖慢查询响应。
  3. ZSTD 压缩:在数据落盘前进行极速压缩,减少了磁盘 I/O 压力,从而间接提升了持续写入的吞吐量。

总结 #

  • Reject 单条写入:生产环境尽量杜绝单条索引请求。
  • 遵守 NDJSON:两行一条,换行符结尾。
  • 检查 errors:Bulk 不保证全成功,必须处理错误响应。
  • 控制体积:5-15MB 是黄金区间。

学会 Bulk API,你的 Easysearch 数据流转效率将提升到一个新的层级。