基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

Kafka Message 结构详解

知识点图片

本文讲解Kafka数据传输的基本单位——Message,详细介绍了其由Key、Value、Headers等部分组成的核心结构与各自的作用。

我们来详细地讲解一下 Kafka 的 Message(消息),在 Kafka 的新版本中,它也被称为 Record (记录)

简单来说,Kafka Message 是 Kafka 中数据传输的基本单位。你发送到 Kafka 的每一条数据,以及你从 Kafka 消费的每一条数据,都是一个 Message。

一个 Message 并不是简单的数据负载(Payload),它包含了一个丰富的结构,这个结构对于 Kafka 的高效运作至关重要。

1. Message 的核心结构

一个完整的 Kafka Message 包含以下几个主要部分:


a. Value (值)

这是 Message 的核心,也就是你真正想要传输的数据本身。它可以是任何东西,比如一个 JSON 字符串、一个 Avro 或 Protobuf 序列化后的对象,或者就是一段纯文本。对于 Kafka 来说,Value 本质上只是一个字节数组 (byte[])。如何解析这个字节数组完全取决于生产者和消费者之间的约定。

  • 示例: "{ "userId": "123", "orderAmount": 99.50, "productId": "p456" }"

b. Key (键)

Key 也是一个字节数组 (byte[]),它是可选的。虽然可选,但 Key 非常非常重要,它主要有两个作用:

  1. 数据分区 (Partitioning): 这是 Key 最核心的用途。当生产者发送消息时,Kafka 的分区器 (Partitioner) 会根据 Key 来决定这条消息应该被发送到 Topic 的哪个 Partition。

    • 默认策略: 对 Key 进行哈希计算,然后用哈希值对 Partition 的数量取模 (hash(key) % num_partitions)。
    • 重要保证: 所有拥有相同 Key 的消息,一定会被发送到同一个 Partition。这就保证了对于特定 Key 的消息,其消费是有序的。例如,如果你用 userId 作为 Key,那么所有关于这个用户的操作事件(如下单、支付、取消)都会按顺序写入同一个分区。
    • 如果 Keynull,消息会以轮询 (Round-robin) 的方式被均匀地发送到不同的 Partition。
  2. 日志压缩 (Log Compaction): 对于启用了日志压缩策略的 Topic,Kafka 会保留每个 Key 最新的一条 Message。这对于存储状态变更或者配置信息等场景非常有用。

  • 示例: "userId:123"

c. Headers (消息头)

Headers 是一组键值对,用于存储一些元数据 (Metadata),这些元数据不适合放在 KeyValue 中。Headers 的设计使得 Kafka 的消息传递更加灵活。

  • 常见用途:

    • 追踪信息 (Tracing): 存储 trace-idspan-id 等,用于分布式系统的链路追踪。
    • 数据来源: 标记消息是由哪个应用或服务生成的。
    • 数据版本: 标记 Value 的 schema 版本号。
    • 路由信息: 存储一些供下游消费者判断如何处理的额外信息。
  • 示例: [{"trace-id": "xyz-abc-123"}, {"source": "payment-service"}]


d. Timestamp (时间戳)

每条消息都有一个时间戳,它代表了消息的发生时间。这个时间戳有两种类型,可以在 Topic 级别进行配置:

  1. CreateTime (默认): 消息由生产者创建时的时间戳。这是最常用的类型。
  2. LogAppendTime: 消息被 Broker 追加到日志 (Log) 中时的时间戳。

这个时间戳对于基于时间的查询、数据清理和流处理(如窗口计算)非常重要。


e. 其他元数据 (由 Broker 添加)

当消息被成功写入 Broker 后,Broker 还会为其附加一些重要的元数据:

  • Offset (偏移量):
    • Offset 是一个在 Partition 内单调递增的整数。
    • 唯一标识了 Partition 中的每一条消息。
    • 消费者通过记录自己消费到的 Offset 来追踪消费进度,从而实现故障恢复和 Exactly-once 语义。
  • Partition: 消息被存储的具体分区号。
  • Topic: 消息所属的主题名称。

一个完整的 Message 在被消费时的样子,可以看作是:
(topic, partition, offset, timestamp, key, value, headers)

2. Message 的关键特性

  • 不可变性 (Immutability): 一旦消息被写入 Kafka,它就不能被修改。你只能通过写入一条新的消息来“更新”状态(尤其是在日志压缩场景下)。这种设计是 Kafka 高吞吐量的基础。
  • 顺序保证 (Ordering Guarantee): Kafka 只保证在单个 Partition 内的消息是有序的。它不保证整个 Topic 的全局顺序。这就是为什么合理地使用 Key 进行分区如此重要。
  • 持久性 (Durability): 消息被写入 Broker 的磁盘上,并可以通过配置副本 (Replication) 来保证高可用和数据不丢失。

3. 数据的序列化 (Serialization)

如前所述,Kafka 的 KeyValue 都只是字节数组。因此,生产者在发送数据前需要将对象序列化 (Serialize) 成字节数组,而消费者在接收数据后需要将字节数组反序列化 (Deserialize) 成对象。

常用的序列化格式有:

  • JSON / String: 简单、易读,但性能和空间效率较低,且没有强制的 schema 约束。
  • Avro (推荐): 由 Apache Hadoop 项目开发,基于 Schema。它将 Schema 与数据分离,序列化后的数据非常紧凑,并且支持 Schema 的平滑演进(向前/向后兼容),非常适合大规模数据系统。通常与 Confluent Schema Registry 结合使用。
  • Protocol Buffers (Protobuf): Google 开发的,同样基于 Schema,性能高,效率好,也支持 Schema 演进。
  • Raw Bytes: 用于传输图片、音频等二进制数据。

4. 代码示例 (Java)

生产者创建并发送一个 Message:

java
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;

// ... (省略了 KafkaProducer 的创建过程)

// 创建一个 ProducerRecord,这就是一个 Message
ProducerRecord<String, String> record = new ProducerRecord<>(
    "order-topic",                 // Topic
    0,                             // Partition (可以不指定,让 Partitioner 决定)
    System.currentTimeMillis(),    // Timestamp
    "user-123",                    // Key
    "{ \"orderId\": \"abc-xyz\", \"amount\": 100.0 }", // Value
    List.of(new RecordHeader("trace-id", "trace-guid-12345".getBytes())) // Headers
);

// 发送消息
producer.send(record);

消费者读取并解析一个 Message:

java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

// ... (省略了 KafkaConsumer 的创建过程)

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
    System.out.printf("Received Message:\n" +
                      "Topic: %s\n" +
                      "Partition: %d\n" +
                      "Offset: %d\n" +
                      "Timestamp: %tF %<tT\n" +
                      "Key: %s\n" +
                      "Value: %s\n" +
                      "Headers: %s\n",
                      record.topic(),
                      record.partition(),
                      record.offset(),
                      record.timestamp(),
                      record.key(),
                      record.value(),
                      record.headers());
}

总结

Kafka 的 Message 远不止是简单的数据负载。它的结构化设计 (Key, Value, Headers, Timestamp)元数据 (Offset, Partition) 是 Kafka 实现高吞吐、分区、有序性、持久性和可靠消费等核心功能的基础。理解 Message 的完整结构和每个部分的作用,是高效、正确使用 Kafka 的关键。

00:00
00:00