基于本文回答
0
评论

讲讲Doris 数据导入的分布式两阶段提交(2PC)事务机制

Apache Doris 的数据导入机制之所以强大且可靠,很大程度上归功于其底层基于分布式两阶段提交(2PC, Two-Phase Commit)的事务机制。这个机制不仅保证了数据导入的 ACID 特性(特别是原子性和一致性),还为流式计算引擎(如 Flink)实现了端到端的 Exactly-Once(精确一次)语义。

下面我将为你详细剖析 Doris 数据导入的 2PC 事务机制。


一、 为什么 Doris 导入需要 2PC?

Doris 是一个分布式系统,一张表的数据可能被哈希或范围分区到多个 Backend (BE) 节点上,并且每个分片(Tablet)通常有多个副本(Replica)。
如果在导入一批数据时,某几个 BE 节点写入成功,而另外几个节点写入失败(例如网络闪断、磁盘满、宕机),就会导致数据不一致(产生半截子数据)。

为了实现“要么全部成功,要么全部失败”(All-or-Nothing)的原子性,Doris 引入了事务机制,并通过 2PC 来协调所有参与写入的 BE 节点。


二、 核心角色划分

在 Doris 的 2PC 中,主要有两个角色:

  1. Coordinator(协调者):通常由 FE(Frontend) 担任。负责生成事务 ID、管理事务状态(Begin, Commit, Abort)、协调各个 BE。
  2. Participant(参与者):即 BE(Backend)。负责实际接收数据、将数据写入磁盘,并向 FE 汇报自身的状态。

三、 Doris 内部完整的导入事务流程(标准流程)

虽然叫“两阶段提交”,但在 Doris 内部的实现中,为了极致的性能和可见性控制,这个过程被细化为了四个关键步骤(Begin -> Write -> Commit -> Publish Version):

Phase 0: Begin(开启事务)

  • 客户端(或导入作业)向 FE 发起导入请求。
  • FE 开启一个全局事务,生成一个全局唯一的 Transaction ID (TxnID),并将其状态标记为 PREPARE

Phase 1: Write / Prepare(第一阶段:写入/准备)

  • 客户端将数据发送给协调节点(通常是某一个 BE)。该 BE 根据路由规则,将数据分发到对应的目标 BE 节点。
  • 目标 BE 收到数据后,会在本地磁盘生成新的数据文件(Rowset),并将这些文件与当前的 TxnID 绑定。
  • 关键点:此时数据虽然已经落盘,但是不可见(Invisible)的,用户无法查询到这些数据。
  • 所有参与的 BE 在写完本地数据后,向 FE 发送写入成功的报告。

Phase 2: Commit(第二阶段:提交)

  • FE 收集所有参与 BE 的执行结果。
  • 如果满足成功条件(例如达到了多数派 Quorum 成功,或者严格要求所有副本全成功),FE 会决定提交该事务。
  • FE 将该 TxnID 的状态修改为 COMMITTED,并将这一状态变更持久化到 FE 的元数据日志(EditLog)中。
  • 关键点:只要 FE 写下了 COMMITTED 日志,这个事务就绝对成功了,即使此时系统断电,重启后也会认为数据已成功导入。但这时的状态仅仅是“逻辑成功”,数据还需要最后一步才能被查到。

Phase 3: Publish Version(发布版本 - 可见性生效)

  • 事务处于 COMMITTED 状态后,FE 会异步(通常是非常快的几毫秒内)向相关的 BE 发送 Publish Version 任务。
  • FE 会为这张表分配一个新的版本号(Version)。
  • BE 收到指令后,将之前处于不可见状态的 Rowset 与这个新的版本号绑定,使其变为可见状态(Visible)
  • 此时,用户发起的查询就能查到这批新导入的数据了。事务彻底完结(状态变为 VISIBLE)。

四、 对外暴露的 2PC API(以 Flink Stream Load 为例)

上面说的是 Doris 内部的运作机制。在 Doris 1.1 版本之后,Doris 开放了 Stream Load 2PC API,允许外部系统(如 Flink)来显式控制上述的 Prepare 和 Commit 阶段,从而实现 Exactly-Once

当启用外部 2PC 时,流程与 Flink 的 Checkpoint 机制完美结合:

  1. Flink 算子启动:向 Doris FE 发送 Txn_Operation=begin,获取 TxnID。
  2. Flink 处理数据流:持续将流式数据按批次发送给 Doris BE(请求头携带 two_phase_commit:true 和 TxnID)。Doris BE 写入不可见的 Rowset(相当于 Prepare 阶段)。
  3. Flink 触发 Checkpoint
    • Flink 算子停止写入,完成当前批次数据的刷盘。
    • Flink 将 TxnID 保存在 Checkpoint 的状态中。
  4. Flink Checkpoint 完成通知 (Notify Checkpoint Complete)
    • Flink 算子收到 Checkpoint 成功的确认后,向 Doris FE 发送 Txn_Operation=commit(相当于 Commit 阶段)。
    • Doris FE 执行内部的 Commit 和 Publish Version。数据对外可见。

如果 Flink 挂了怎么办?

  • 如果 Flink 在 Checkpoint 完成前挂了:Doris 中的事务处于 Prepare 状态,Flink 恢复后会回滚到上一个 Checkpoint,并向 Doris 发起 Txn_Operation=abort 废弃之前的脏数据(Doris 自身也有超时自动 Abort 机制)。
  • 如果 Flink 在发送 Commit 时挂了:因为 Checkpoint 已经记录了该 TxnID 应该被提交,Flink 恢复后会重新向 Doris 发送一次对应 TxnID 的 Commit 请求(幂等操作),确保数据生效。

五、 异常与故障容忍 (Fault Tolerance)

Doris 的 2PC 机制对各种异常做了严密的兜底:

  • BE 宕机(Write 阶段):如果数据无法写入足够的副本,FE 会直接将事务状态标记为 ABORTED。重启后的 BE 会通过垃圾回收(GC)机制,清理掉那些未提交的无用 Rowset 文件。
  • FE 宕机(Commit 阶段前):事务处于 PREPARE 状态,FE 选主恢复后,如果超过超时时间没有提交,会自动将其 ABORT 并清理 BE 上的数据。
  • BE 宕机(Publish Version 阶段):由于 FE 已经记录了 COMMITTED,FE 会不断重试向该 BE 发送 Publish 任务,直到该 BE 恢复并成功发布版本,保证了最终一致性。

总结

Doris 的分布式两阶段提交机制,巧妙地将数据的物理写入(Write/Prepare)逻辑可见性(Commit/Publish)分离开来。

  • 对于普通用户,它在后台默默保证了无论通过 Broker Load 还是 Routine Load 导入多大的数据量,绝不会出现“查出半批数据”的尴尬场景。
  • 对于大数据架构师,它提供的 Stream Load 2PC API 是构建实时数仓(如 Flink + Doris)实现端到端数据一致性(不丢不重)的核心基石。
右滑查看面试常问