讲讲Doris 数据导入的分布式两阶段提交(2PC)事务机制
Apache Doris 的数据导入机制之所以强大且可靠,很大程度上归功于其底层基于分布式两阶段提交(2PC, Two-Phase Commit)的事务机制。这个机制不仅保证了数据导入的 ACID 特性(特别是原子性和一致性),还为流式计算引擎(如 Flink)实现了端到端的 Exactly-Once(精确一次)语义。
下面我将为你详细剖析 Doris 数据导入的 2PC 事务机制。
一、 为什么 Doris 导入需要 2PC?
Doris 是一个分布式系统,一张表的数据可能被哈希或范围分区到多个 Backend (BE) 节点上,并且每个分片(Tablet)通常有多个副本(Replica)。
如果在导入一批数据时,某几个 BE 节点写入成功,而另外几个节点写入失败(例如网络闪断、磁盘满、宕机),就会导致数据不一致(产生半截子数据)。
为了实现“要么全部成功,要么全部失败”(All-or-Nothing)的原子性,Doris 引入了事务机制,并通过 2PC 来协调所有参与写入的 BE 节点。
二、 核心角色划分
在 Doris 的 2PC 中,主要有两个角色:
- Coordinator(协调者):通常由 FE(Frontend) 担任。负责生成事务 ID、管理事务状态(Begin, Commit, Abort)、协调各个 BE。
- Participant(参与者):即 BE(Backend)。负责实际接收数据、将数据写入磁盘,并向 FE 汇报自身的状态。
三、 Doris 内部完整的导入事务流程(标准流程)
虽然叫“两阶段提交”,但在 Doris 内部的实现中,为了极致的性能和可见性控制,这个过程被细化为了四个关键步骤(Begin -> Write -> Commit -> Publish Version):
Phase 0: Begin(开启事务)
- 客户端(或导入作业)向 FE 发起导入请求。
- FE 开启一个全局事务,生成一个全局唯一的 Transaction ID (TxnID),并将其状态标记为
PREPARE。
Phase 1: Write / Prepare(第一阶段:写入/准备)
- 客户端将数据发送给协调节点(通常是某一个 BE)。该 BE 根据路由规则,将数据分发到对应的目标 BE 节点。
- 目标 BE 收到数据后,会在本地磁盘生成新的数据文件(Rowset),并将这些文件与当前的 TxnID 绑定。
- 关键点:此时数据虽然已经落盘,但是不可见(Invisible)的,用户无法查询到这些数据。
- 所有参与的 BE 在写完本地数据后,向 FE 发送写入成功的报告。
Phase 2: Commit(第二阶段:提交)
- FE 收集所有参与 BE 的执行结果。
- 如果满足成功条件(例如达到了多数派 Quorum 成功,或者严格要求所有副本全成功),FE 会决定提交该事务。
- FE 将该 TxnID 的状态修改为
COMMITTED,并将这一状态变更持久化到 FE 的元数据日志(EditLog)中。 - 关键点:只要 FE 写下了
COMMITTED日志,这个事务就绝对成功了,即使此时系统断电,重启后也会认为数据已成功导入。但这时的状态仅仅是“逻辑成功”,数据还需要最后一步才能被查到。
Phase 3: Publish Version(发布版本 - 可见性生效)
- 事务处于
COMMITTED状态后,FE 会异步(通常是非常快的几毫秒内)向相关的 BE 发送Publish Version任务。 - FE 会为这张表分配一个新的版本号(Version)。
- BE 收到指令后,将之前处于不可见状态的 Rowset 与这个新的版本号绑定,使其变为可见状态(Visible)。
- 此时,用户发起的查询就能查到这批新导入的数据了。事务彻底完结(状态变为
VISIBLE)。
四、 对外暴露的 2PC API(以 Flink Stream Load 为例)
上面说的是 Doris 内部的运作机制。在 Doris 1.1 版本之后,Doris 开放了 Stream Load 2PC API,允许外部系统(如 Flink)来显式控制上述的 Prepare 和 Commit 阶段,从而实现 Exactly-Once。
当启用外部 2PC 时,流程与 Flink 的 Checkpoint 机制完美结合:
- Flink 算子启动:向 Doris FE 发送
Txn_Operation=begin,获取 TxnID。 - Flink 处理数据流:持续将流式数据按批次发送给 Doris BE(请求头携带
two_phase_commit:true和 TxnID)。Doris BE 写入不可见的 Rowset(相当于 Prepare 阶段)。 - Flink 触发 Checkpoint:
- Flink 算子停止写入,完成当前批次数据的刷盘。
- Flink 将 TxnID 保存在 Checkpoint 的状态中。
- Flink Checkpoint 完成通知 (Notify Checkpoint Complete):
- Flink 算子收到 Checkpoint 成功的确认后,向 Doris FE 发送
Txn_Operation=commit(相当于 Commit 阶段)。 - Doris FE 执行内部的 Commit 和 Publish Version。数据对外可见。
- Flink 算子收到 Checkpoint 成功的确认后,向 Doris FE 发送
如果 Flink 挂了怎么办?
- 如果 Flink 在 Checkpoint 完成前挂了:Doris 中的事务处于 Prepare 状态,Flink 恢复后会回滚到上一个 Checkpoint,并向 Doris 发起
Txn_Operation=abort废弃之前的脏数据(Doris 自身也有超时自动 Abort 机制)。 - 如果 Flink 在发送 Commit 时挂了:因为 Checkpoint 已经记录了该 TxnID 应该被提交,Flink 恢复后会重新向 Doris 发送一次对应 TxnID 的 Commit 请求(幂等操作),确保数据生效。
五、 异常与故障容忍 (Fault Tolerance)
Doris 的 2PC 机制对各种异常做了严密的兜底:
- BE 宕机(Write 阶段):如果数据无法写入足够的副本,FE 会直接将事务状态标记为
ABORTED。重启后的 BE 会通过垃圾回收(GC)机制,清理掉那些未提交的无用 Rowset 文件。 - FE 宕机(Commit 阶段前):事务处于
PREPARE状态,FE 选主恢复后,如果超过超时时间没有提交,会自动将其ABORT并清理 BE 上的数据。 - BE 宕机(Publish Version 阶段):由于 FE 已经记录了
COMMITTED,FE 会不断重试向该 BE 发送 Publish 任务,直到该 BE 恢复并成功发布版本,保证了最终一致性。
总结
Doris 的分布式两阶段提交机制,巧妙地将数据的物理写入(Write/Prepare)与逻辑可见性(Commit/Publish)分离开来。
- 对于普通用户,它在后台默默保证了无论通过 Broker Load 还是 Routine Load 导入多大的数据量,绝不会出现“查出半批数据”的尴尬场景。
- 对于大数据架构师,它提供的 Stream Load 2PC API 是构建实时数仓(如 Flink + Doris)实现端到端数据一致性(不丢不重)的核心基石。