Elasticsearch中,如何解决并发写入时的版本冲突问题(乐观锁/Version)?
在 Elasticsearch (ES) 中,由于其分布式和异步写入的特性,不支持传统关系型数据库的 ACID 事务和悲观锁(Row-level locking)。ES 采用 乐观并发控制(Optimistic Concurrency Control, OCC) 机制,通过版本号(Versioning)来解决并发写入冲突问题。
以下是解决并发冲突的三种主要方案,分别适用于不同的场景:
1. 使用 _seq_no 和 _primary_term (推荐,ES 6.7+ / 7.x+ 标准做法)
在旧版本的 ES 中,我们直接使用 _version 字段。但在现代 ES 版本中,为了更好地处理主分片切换等故障恢复场景,官方推荐使用 Sequence Number (_seq_no) 和 Primary Term (_primary_term) 配合使用来实现乐观锁。
原理:
- 读(Read): 先读取文档,获取当前的
_seq_no和_primary_term。 - 写(Write): 在更新请求中带上刚才读取到的这两个参数。
- 校验: ES 会比较请求中的参数与当前文档的实际参数。只有当两者完全一致时,写入才成功;否则抛出
409 VersionConflictEngineException。
操作步骤示例:
第一步:读取文档
GET /products/_doc/1
响应:
{
"_index": "products",
"_id": "1",
"_version": 1,
"_seq_no": 10, <-- 关注这个
"_primary_term": 1, <-- 关注这个
"_source": { "price": 100 }
}
第二步:带条件更新
假设我们要把价格改为 200,必须带上刚才读到的值:
POST /products/_doc/1?if_seq_no=10&if_primary_term=1
{
"price": 200
}
- 如果成功: 返回 200 OK,文档更新,
_seq_no增加。 - 如果失败(并发冲突): 返回
409 Conflict。此时应用程序需要捕获异常,重新读取最新数据,重新计算业务逻辑,再次尝试写入(CAS - Compare And Swap 逻辑)。
2. 使用外部版本号 (version_type=external)
场景: 当你的数据源头不是 ES,而是其他数据库(如 MySQL),且该数据库已经有了版本控制机制(如 MySQL 的 timestamp 或 version 字段,或者 Kafka 的 offset)。
原理:
ES 不再检查版本号是否相等,而是检查请求中的版本号是否大于当前 ES 中的版本号。
- Internal (默认): 请求版本 == 当前版本 (用于 CAS)。
- External: 请求版本 > 当前版本。
操作示例:
假设 MySQL 中某条数据的版本号更新到了 500。
PUT /products/_doc/1?version=500&version_type=external
{
"price": 200
}
- 如果 ES 中当前版本是 499:写入成功,ES 将版本更新为 500。
- 如果 ES 中当前版本是 500 或 501(可能旧数据重发):写入失败,抛出 409 冲突。
3. 使用 retry_on_conflict (仅限 _update API)
场景: 适用于局部更新或脚本更新,且你不关心中间状态,只希望更新最终能执行成功。例如:累加计数器、更新某个无关紧要的字段。
原理:
当发生版本冲突时,ES 内部自动重新获取文档的新版本,并重新应用更新脚本,直到达到重试次数上限。这省去了在应用层写 try-catch 重试逻辑的麻烦。
操作示例:
给商品价格加 10,如果冲突自动重试 3 次:
POST /products/_update/1?retry_on_conflict=3
{
"script": {
"source": "ctx._source.price += params.count",
"params": {
"count": 10
}
}
}
总结与最佳实践
| 方案 | 关键参数 | 适用场景 | 冲突处理逻辑 |
|---|---|---|---|
| 内部乐观锁 | if_seq_no, if_primary_term |
最常用。业务逻辑依赖于读取到的旧数据(如:库存扣减)。 | 应用层捕获 409,重新读取 -> 计算 -> 写入。 |
| 外部版本号 | version, version_type=external |
数据同步。ES 作为从库,主数据在 MySQL/Oracle 等。 | 忽略旧版本数据(幂等性保证),直接丢弃请求。 |
| 自动重试 | retry_on_conflict |
局部/脚本更新。简单的计数、状态追加,不依赖复杂业务校验。 | ES 内部自动重试,无需应用层干预。 |
开发建议:
在处理高并发写入时,建议在应用代码中封装一个标准的 Retry Loop(重试循环):
GET获取数据及_seq_no。- 在内存中修改数据。
- 尝试带
if_seq_no写入。 - 如果捕获到
409异常,回到第 1 步(设置最大重试次数以防死循环)。