基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

Kafka 如何支持事务(Transaction)?

知识点图片

Kafka 在 0.11.0 版本中引入了事务(Transaction)功能,主要目的是为了支持 Exactly-Once Semantics(精确一次语义),特别是在 "流处理"(Stream Processing)场景下的 Read-Process-Write(读取-处理-写入) 模式。

Kafka 的事务机制允许生产者将消息原子性地写入多个分区(Partition),即:要么所有消息都成功写入并对消费者可见,要么所有消息都丢弃,消费者不可见。

以下是 Kafka 支持事务的核心机制、组件和工作流程的详细解析:


1. 核心概念与组件

要理解 Kafka 事务,需要了解以下几个关键组件:

  1. Transactional ID (事务 ID):

    • 用户需要为 Producer 配置一个唯一的 transactional.id
    • 这使得 Producer 在重启或故障恢复后,Broker 能识别出它是同一个 Producer,从而恢复之前的状态或清理未完成的事务。
  2. Transaction Coordinator (事务协调器):

    • 这是运行在 Broker 上的一个模块。
    • 每个 transactional.id 都会映射到一个特定的 Transaction Coordinator。
    • 它负责管理事务的整个生命周期(开启、提交、回滚)。
  3. Transaction Log (事务日志):

    • 这是一个内部 Topic,名为 __transaction_state
    • Coordinator 将事务的状态变更(如 Ongoing, PrepareCommit, CompleteCommit)持久化写入这个 Topic,类似于 __consumer_offsets
  4. Control Message (控制消息/标记):

    • 这是一种特殊类型的消息,Producer 不会直接生成它,而是由 Coordinator 写入到用户数据的 Topic 分区中。
    • 标记有两种:COMMITABORT。它们用来告诉消费者,在这个标记之前的消息是已提交的还是该丢弃的。

2. 事务的工作流程 (Step-by-Step)

整个事务流程可以概括为以下几个步骤:

第一步:查找协调器 (Find Coordinator)

Producer 向任意 Broker 发送请求,询问哪个 Broker 是其 transactional.id 对应的 Transaction Coordinator。

第二步:获取 PID 和 Epoch (Init Transactions)

Producer 调用 initTransactions()

  • Producer 向 Coordinator 注册 transactional.id
  • Coordinator 分配一个 Producer ID (PID) 和一个 Epoch
  • Zombie Fencing (僵尸防护): 如果该 transactional.id 之前有一个旧的 Producer 实例(僵尸实例)还在运行,Coordinator 会增加 Epoch。旧实例如果再尝试写入,会被 Broker 拒绝(因为 Epoch 过期),从而保证只有一个活跃的 Producer。

第三步:开启事务 (Begin Transaction)

Producer 调用 beginTransaction()。此时只是在本地将状态标记为“事务中”,尚未与 Broker 交互。

第四步:消费与发送 (Consume & Produce)

Read-Process-Write 场景中:

  1. 注册分区: 当 Producer 第一次向某个 Topic 分区发送数据时,Coordinator 会在 __transaction_state 中记录该事务涉及到了这个分区。
  2. 发送数据: Producer 将消息发送给 Broker。注意: 此时消息已经实实在在地写到了 Leader 的磁盘上,但是被标记为“未提交”状态。

第五步:提交消费位移 (Send Offsets to Transaction - 可选)

如果是流处理(消费 A -> 处理 -> 生产 B),需要将 A 的消费位移也作为事务的一部分提交。
Producer 会发送请求给 Coordinator,Coordinator 会将位移提交请求转发给管理 __consumer_offsets 的 Group Coordinator。这确保了“消息生产”和“位移提交”是原子的。

第六步:提交或回滚事务 (Commit or Abort)

Producer 调用 commitTransaction()abortTransaction()

如果是提交 (Commit):

  1. Prepare Phase: Coordinator 在 __transaction_state 中写入 PREPARE_COMMIT 消息。一旦这条消息写入成功,事务就被认为是“由于成功了”(Point of no return)。
  2. Write Markers: Coordinator 向所有参与该事务的 Topic 分区(以及 __consumer_offsets)写入 Transaction Marker (控制消息),标记为 COMMIT
  3. Complete Phase: Coordinator 在 __transaction_state 中写入 COMPLETE_COMMIT,事务结束。

如果是回滚 (Abort):
流程类似,只是写入的标记是 ABORT


3. 消费者端 (Consumer Isolation Level)

仅仅生产者支持事务是不够的,消费者也需要配合。Kafka 引入了 isolation.level 配置:

  1. read_uncommitted (默认):

    • 消费者可以看到所有消息,包括已提交的、未提交的(进行中)和已回滚的消息。
    • 这相当于没有事务隔离。
  2. read_committed:

    • 消费者只能读取到已提交(Committed)的消息。
    • LSO (Last Stable Offset): Broker 会告诉消费者一个 LSO。LSO 之前的所有事务都已完成(要么提交,要么回滚)。
    • 消费者在读取消息时,会缓存消息直到遇到 Control Marker
      • 如果遇到 COMMIT 标记,则将之前的消息返回给用户。
      • 如果遇到 ABORT 标记,则丢弃之前的消息。

4. 幂等性 vs 事务 (Idempotence vs Transactions)

这两个概念经常被混淆,但它们是递进关系:

  • 幂等性 (Idempotence):

    • 配置:enable.idempotence=true
    • 范围:单分区、单会话
    • 原理:利用 PID 和 Sequence Number (序列号) 去重。如果 Producer 重试发送相同的消息,Broker 会发现序列号重复而丢弃。
    • 局限:无法保证跨分区或 Producer 重启后的原子性。
  • 事务 (Transactions):

    • 配置:需要配置 transactional.id,且隐含开启了幂等性。
    • 范围:跨分区、跨会话
    • 原理:利用 Coordinator 和 Transaction Log 进行两阶段提交(2PC)。

5. 代码示例 (Java)

java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "...");
props.put("value.serializer", "...");

// 1. 必须设置 transactional.id
props.put("transactional.id", "my-transactional-id");
// 2. 幂等性会自动开启,但显式开启是个好习惯
props.put("enable.idempotence", "true");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 3. 初始化事务 (向 Coordinator 注册,防僵尸)
producer.initTransactions();

try {
    // 4. 开启事务
    producer.beginTransaction();

    // 5. 发送消息 (可能涉及多个分区)
    producer.send(new ProducerRecord<>("topic-A", "Key1", "Value1"));
    producer.send(new ProducerRecord<>("topic-B", "Key2", "Value2"));

    // 6. (可选) 如果是流处理,还需要提交消费者的 Offset
    // producer.sendOffsetsToTransaction(...);

    // 7. 提交事务
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // 致命错误,Producer 不能再用了
    producer.close();
} catch (KafkaException e) {
    // 8. 发生错误,回滚事务
    producer.abortTransaction();
}

总结

Kafka 通过引入 Transaction CoordinatorTransaction Log 实现了类似两阶段提交(2PC)的协议。它利用 Control Marker 将事务状态下发到具体的数据分区,使得配置了 read_committed 的消费者能够过滤掉未提交或已回滚的数据,从而实现了跨分区、跨会话的精确一次语义。

00:00
00:00