基于本文回答

播面 播面

刷题像听歌,多听自然懂
0
评论

一张主键表最初预估每日数据量较小,配置了固定桶模式 'bucket' = 10。运行一年后,单分区数据量增长至 1TB,导致 LSM 树过深,查询和后台 Compaction 速度急剧退化。请问如何在不停止上游流式读取的前提下,将固定桶数量平滑地重构并扩容至 100 个桶?

在 Apache Paimon 中,固定桶(Fixed Bucket)模式下,桶的数量决定了数据的分布和并行度。当单分区数据量暴增至 1TB 却只有 10 个桶时,每个桶的数据量高达 100GB 左右,这远远超出了官方推荐的单个桶 200MB ~ 1GB 的最佳实践范围。这会导致底层的 LSM 树层级过深,使得后台的 Compaction(文件合并)压力剧增,查询和写入性能发生急剧退化。

由于直接修改运行中的作业元数据并重新分桶会导致物理布局不匹配,直接对正在写入的表执行 ALTER TABLE ... SET ('bucket' = '100') 会导致新流式写入作业因检测到历史桶数量不一致而抛出异常

为了在不停止上游流式读取与写入的前提下实现平滑扩容,行业内通常采用“双写 + 历史回刷”“分区级渐进重构”方案。其中,sequence.field 是保证整套方案数据一致性的核心机制。


一、 核心机制:sequence.field 的关键救场作用

在分布式和流批结合的场景中,数据乱序是常态。Paimon 主键表默认的合并机制是 LIFO(后写入的数据覆盖先写入的数据)

但在“双写 + 历史回刷”的平滑迁移过程中,会出现以下情况:

  1. 实时流作业从上游(如 Kafka)实时消费最新数据(如版本 V2V_2),写入新表。
  2. 历史回刷作业(批任务)从旧表中读取一年前的历史数据(如版本 V1V_1),写入新表。

由于这两个作业是并行或交替进行的,很可能实时流已经把 V2V_2 写入了新桶,而稍后历史回刷任务才将 V1V_1 写入同一个桶。如果未配置 sequence.field,新表将错误地以历史旧数据 V1V_1 覆盖最新的 V2V_2,造成严重的数据回退和不一致。

配置了 sequence.field(如业务更新时间戳 updated_at)后:
Paimon 在合并同一主键的数据时,会比对 sequence.field 的值,只有当新写入的数据的 sequence 值大于或等于已有数据时,才会执行更新。因此,在回刷旧数据时,由于其时间戳较小,会被自动忽略,从而保证了新表数据的最终一致性。


二、 方案一:新旧表平滑迁移(适用于无分区或需要整体重构的表)

这是最通用的 0 停机方案,适用于无分区表或需要对整张表进行重构的场景。

1. 创建目标新表 target_table

创建一张与原表 Schema 完全一致的新表,并将 bucket 设为 100。务必配置 sequence.field

sql
CREATE TABLE target_table (
    id BIGINT,
    data STRING,
    updated_at TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'bucket' = '100',                     -- 扩容至 100 桶
    'sequence.field' = 'updated_at',       -- 指定更新时间戳字段,防止历史数据回刷时覆盖新数据
    'merge-engine' = 'deduplicate'        -- 主键去重引擎
);

2. 启动实时流双写或切换写入

修改上游的流式写入 Flink 作业,将其输出同时指向旧表和新表 target_table(即双写模式);或者直接将流式写入完全切换到新表 target_table(由于 Kafka 具备数据积压能力,可以从当前最新位点开始往新表写入)。
此时,新表中仅包含开始双写后的实时增量数据。

3. 启动历史数据回刷任务

启动一个 Flink Batch 作业,将旧表中积攒的 1TB 历史数据一次性导入新表。

sql
-- 切换为批运行模式以提高吞吐
SET 'execution.runtime-mode' = 'batch';

INSERT INTO target_table 
SELECT id, data, updated_at FROM source_table;
  • 安全保障: 即使批任务运行耗时较长,且回刷的数据与实时双写的数据有主键重合,sequence.field 机制也会确保只有最新的记录被保留,历史回刷的旧状态不会覆盖实时写入的新状态。

4. 平滑切换下游流式读取

当历史数据回刷完成,且新表的数据水位与旧表完全对齐后,将下游消费该表的流式读取作业(Streaming Read)的 Source 端从 source_table 修改为 target_table

5. 清理下线

停止向旧表双写,彻底停止旧表的流写入作业,并在验证数据无误后删除旧表。


三、 方案二:分区级渐进重构(适用于分区表)

如果该 1TB 的表是以天等维度进行分区的(例如 PARTITIONED BY (dt)),Paimon 允许不同分区拥有不同的桶数量。这样可以避免创建新表,直接在原表上完成渐进式扩容。

1. 修改元数据桶数量

在不停止当前流式写入的情况下,执行 DDL 修改表的默认桶数:

sql
ALTER TABLE my_table SET ('bucket' = '100');
  • 注意: 这一步仅仅修改了元数据。正在运行的流写入作业由于缓存了先前的配置,会继续向当前活跃分区(如当天分区 dt=2026-05-25)写入 10 个桶,作业不会报错,读取也不会受影响。

2. 历史分区重构(逐个执行)

对于已经关闭、不再有实时流写入的历史分区(如历史的 360 天分区),我们可以采用 Flink 批作业,通过 INSERT OVERWRITE 逐个或分批进行物理重构:

sql
SET 'execution.runtime-mode' = 'batch';

-- 对特定历史分区执行覆写,Paimon 会自动按新的 100 桶逻辑重新 Hash 分流
INSERT OVERWRITE my_table PARTITION (dt = '2026-05-24') 
SELECT id, data, updated_at FROM my_table PARTITION (dt = '2026-05-24');

通过这种方式,可以在后台默默将过去一年的上百个历史分区逐步重构为 100 桶,解决绝大部分历史数据的查询与 Compaction 性能问题。

3. 活动分区重构(在边界切换)

对于当前正在实时写入的活跃分区,通常在分区切换时(例如到了次日零点,流作业自动创建并写入新分区 dt=2026-05-26 时)进行处理。

  • 新创建的分区由于读取了修改后的元数据,会自动以 100 桶 的布局进行写入。
  • 一旦昨天的分区彻底变为了历史分区,不再有流式写入,便可以安全地对昨天的分区执行一次 INSERT OVERWRITE 批重构。

四、 总结与最佳实践建议

  1. 提前配置 sequence.field 无论采用哪种方案,表在设计之初或扩容重建时,都建议显式指定 sequence.field(如 op_tsupdated_at),这不仅是为了平滑扩容,也是为了应对生产环境中天然存在的分布式消息乱序问题。
  2. 未来规避方案(动态桶模式): 如果业务数据量增长难以预估,在重建新表时可以考虑直接配置为 动态桶模式'bucket' = '-1')。在动态桶模式下,Paimon 会根据实际数据量自动完成桶的扩容,无需人工干预和后期的物理重构,但需要注意该模式会额外消耗一定的内存用于维护全局索引。
00:00
00:00