基于本文回答

播面 播面

刷题像听歌,多听自然懂
0
评论

讲讲 Paimon 读数据(Scan & Read)的两阶段执行流程

知识点图片

在 Apache Paimon 中,当表的数据量较大时,为了实现高效的分布式读取,Paimon 将数据的读取过程设计为两阶段执行流程(Two-Stage Execution Process)。这两阶段分别是:

  1. Scan Plan(扫描规划阶段)
  2. Read Split(分片读取阶段)

这种设计将“元数据解析与任务切分”和“实际物理数据读取”进行解耦,能够在分布式计算引擎(如 Flink、Spark)中实现高并发、低 I/O 的读取。


第一阶段:Scan Plan(扫描规划)

  • 执行位置:全局中心节点,例如 Flink 的 Coordinator 或 Spark 的 Driver
  • 核心任务
    1. 元数据解析:读取 Paimon 的元数据(Schema、Snapshot、Manifests 等),确定需要读取的数据范围(例如最新的 Snapshot 或指定的 Tag)。
    2. 下推过滤与裁剪(Pruning)
      • 分区裁剪(Partition Pruning):根据查询条件中的分区字段,过滤掉不符合条件的分区目录。
      • 分桶裁剪(Bucket Pruning):如果查询条件包含 bucket-key(分桶键),Paimon 可以直接计算出数据所在的 Bucket,从而只扫描对应的 Bucket 文件,跳过其他无用 Bucket。
      • 文件与索引过滤:利用统计信息(Min/Max)以及 File Index(如 Bloom Filter、Bitmap、Range Bitmap 等),在不读取实际数据文件的前提下,过滤掉不匹配的数据文件,以大幅减少物理 I/O。
    3. 生成分片(Splits):将筛选后的物理文件组织成若干个逻辑分片(Split 实例),每个 Split 代表分布式任务中一个最小的读取单元。

批读与流读在 Scan 阶段的区别:

  • 批读(Batch Read):Scan Plan 是一次性的。它在启动时规划好当前 Snapshot 的所有 Split,并将其发送给下游。
  • 流读(Stream Read):Scan Plan 是持续进行的。StreamTableScan 会启动一个轮询线程,持续扫描 Paimon 表的元数据,一旦发现新的 Commit(新产生的 Snapshot 增量数据),就会增量地规划出新的 Splits 发送给下游。同时,它还支持 Checkpoint 机制,以便在发生故障时通过保存的状态(State)恢复读取进度。

第二阶段:Read Split(分片读取)

  • 执行位置:分布式的计算节点或任务(如 Flink 的 SourceReader Task 或 Spark 的 Executor Task)。
  • 核心任务
    1. 接收 Split:每个 Task 从全局节点接收分配给自己的 Split 列表。
    2. 物理文件读取:Task 创建底层的 RecordReader,并发读取 Split 中指向的实际物理数据文件(如 Parquet 或 ORC 格式的文件)。
    3. 细粒度过滤与合并
      • 细粒度过滤(Record-level Filter):Scan Plan 阶段通常只能做到文件级别或分桶级别的粗粒度过滤。在 Read Split 阶段,Reader 会逐条(Record-level)对数据执行细粒度过滤(executeFilter)。
      • LSM-Tree 合并读取(Merge-on-Read):如果是 Primary Key 表且未进行 Full Compaction,Reader 需要在内存中对同一个 Bucket 内不同 Level 的数据文件(包括 Changelog 文件、Deletes 文件等)进行多路归并合并(Merge),以还原出最新、最准确的行数据。

Java API 代码实现对比

以下是 Paimon 官方文档中展示的 Java 客户端 API 代码片段,可以非常直观地看到这两阶段的分工:

1. 批读(Batch Read)流程

java
// Step 1: 创建 ReadBuilder,并传入投影(Projection)和过滤条件(Filter)
Table table = GetTable.getTable();
ReadBuilder readBuilder = table.newReadBuilder()
    .withProjection(projection)
    .withFilter(filters);

// ================ 第一阶段:Scan Plan (在 Coordinator / Driver 运行) ================
// 规划生成逻辑分片(Splits)
List<Split> splits = readBuilder.newScan().plan().splits();

// (分布式计算引擎在此处将 splits 分发到各个分布式的 Task 中)

// ================ 第二阶段:Read Split (在具体的分布式 Task 中运行) ================
// 每个 Task 拿到属于自己的 splits 后,创建 TableRead 对象
TableRead read = readBuilder.newRead().executeFilter(); // executeFilter 用于逐条做细粒度过滤

// 创建具体的 RecordReader 读取物理文件数据
RecordReader<InternalRow> reader = read.createReader(splits);
reader.forEachRemaining(System.out::println);

2. 流读(Stream Read)流程

流读在第二阶段(Task 读取)的逻辑相似,但在第一阶段(Scan 规划)中是持续循环且带状态的:

java
Table table = GetTable.getTable();
ReadBuilder readBuilder = table.newReadBuilder()
    .withProjection(projection)
    .withFilter(filters);

// ================ 第一阶段:Scan Plan (持续进行) ================
StreamTableScan scan = readBuilder.newStreamScan();

while (true) {
    // 持续扫描新的 Snapshot 提交,并增量生成 Splits
    List<Split> splits = scan.plan().splits();
    
    // (将新增的 splits 实时分发给 Task)
    
    // 记录 Scan 的 Offset/State,以便进行 Checkpoint 容错恢复
    Long state = scan.checkpoint(); 
    
    // ================ 第二阶段:Read Split (在分布式 Task 异步且持续读取) ================
    TableRead read = readBuilder.newRead().executeFilter();
    RecordReader<InternalRow> reader = read.createReader(splits);
    reader.forEachRemaining(System.out::println);
    
    Thread.sleep(1000);
}

总结

Paimon 通过两阶段读取的设计,最大化地利用了分布式引擎的协调能力:将元数据访问、分区/分桶裁剪、索引过滤收敛在 Scan Plan 阶段,确保不会因为元数据文件的并发访问给文件系统带来过大的压力;而将繁重的文件 I/O、解压反序列化以及 LSM 合并去重等计算密集型工作彻底分散在分布式 Task 的 Read Split 阶段,从而保证了在大规模数据量下的高效读取。

00:00
00:00