讲讲 Paimon 读数据(Scan & Read)的两阶段执行流程
在 Apache Paimon 中,当表的数据量较大时,为了实现高效的分布式读取,Paimon 将数据的读取过程设计为两阶段执行流程(Two-Stage Execution Process)。这两阶段分别是:
- Scan Plan(扫描规划阶段)
- Read Split(分片读取阶段)
这种设计将“元数据解析与任务切分”和“实际物理数据读取”进行解耦,能够在分布式计算引擎(如 Flink、Spark)中实现高并发、低 I/O 的读取。
第一阶段:Scan Plan(扫描规划)
- 执行位置:全局中心节点,例如 Flink 的
Coordinator或 Spark 的Driver。 - 核心任务:
- 元数据解析:读取 Paimon 的元数据(Schema、Snapshot、Manifests 等),确定需要读取的数据范围(例如最新的 Snapshot 或指定的 Tag)。
- 下推过滤与裁剪(Pruning):
- 分区裁剪(Partition Pruning):根据查询条件中的分区字段,过滤掉不符合条件的分区目录。
- 分桶裁剪(Bucket Pruning):如果查询条件包含
bucket-key(分桶键),Paimon 可以直接计算出数据所在的 Bucket,从而只扫描对应的 Bucket 文件,跳过其他无用 Bucket。 - 文件与索引过滤:利用统计信息(Min/Max)以及 File Index(如 Bloom Filter、Bitmap、Range Bitmap 等),在不读取实际数据文件的前提下,过滤掉不匹配的数据文件,以大幅减少物理 I/O。
- 生成分片(Splits):将筛选后的物理文件组织成若干个逻辑分片(
Split实例),每个Split代表分布式任务中一个最小的读取单元。
批读与流读在 Scan 阶段的区别:
- 批读(Batch Read):Scan Plan 是一次性的。它在启动时规划好当前 Snapshot 的所有 Split,并将其发送给下游。
- 流读(Stream Read):Scan Plan 是持续进行的。
StreamTableScan会启动一个轮询线程,持续扫描 Paimon 表的元数据,一旦发现新的 Commit(新产生的 Snapshot 增量数据),就会增量地规划出新的 Splits 发送给下游。同时,它还支持 Checkpoint 机制,以便在发生故障时通过保存的状态(State)恢复读取进度。
第二阶段:Read Split(分片读取)
- 执行位置:分布式的计算节点或任务(如 Flink 的
SourceReader Task或 Spark 的Executor Task)。 - 核心任务:
- 接收 Split:每个 Task 从全局节点接收分配给自己的
Split列表。 - 物理文件读取:Task 创建底层的
RecordReader,并发读取 Split 中指向的实际物理数据文件(如 Parquet 或 ORC 格式的文件)。 - 细粒度过滤与合并:
- 细粒度过滤(Record-level Filter):Scan Plan 阶段通常只能做到文件级别或分桶级别的粗粒度过滤。在 Read Split 阶段,Reader 会逐条(Record-level)对数据执行细粒度过滤(
executeFilter)。 - LSM-Tree 合并读取(Merge-on-Read):如果是 Primary Key 表且未进行 Full Compaction,Reader 需要在内存中对同一个 Bucket 内不同 Level 的数据文件(包括 Changelog 文件、Deletes 文件等)进行多路归并合并(Merge),以还原出最新、最准确的行数据。
- 细粒度过滤(Record-level Filter):Scan Plan 阶段通常只能做到文件级别或分桶级别的粗粒度过滤。在 Read Split 阶段,Reader 会逐条(Record-level)对数据执行细粒度过滤(
- 接收 Split:每个 Task 从全局节点接收分配给自己的
Java API 代码实现对比
以下是 Paimon 官方文档中展示的 Java 客户端 API 代码片段,可以非常直观地看到这两阶段的分工:
1. 批读(Batch Read)流程
java
// Step 1: 创建 ReadBuilder,并传入投影(Projection)和过滤条件(Filter)
Table table = GetTable.getTable();
ReadBuilder readBuilder = table.newReadBuilder()
.withProjection(projection)
.withFilter(filters);
// ================ 第一阶段:Scan Plan (在 Coordinator / Driver 运行) ================
// 规划生成逻辑分片(Splits)
List<Split> splits = readBuilder.newScan().plan().splits();
// (分布式计算引擎在此处将 splits 分发到各个分布式的 Task 中)
// ================ 第二阶段:Read Split (在具体的分布式 Task 中运行) ================
// 每个 Task 拿到属于自己的 splits 后,创建 TableRead 对象
TableRead read = readBuilder.newRead().executeFilter(); // executeFilter 用于逐条做细粒度过滤
// 创建具体的 RecordReader 读取物理文件数据
RecordReader<InternalRow> reader = read.createReader(splits);
reader.forEachRemaining(System.out::println);
2. 流读(Stream Read)流程
流读在第二阶段(Task 读取)的逻辑相似,但在第一阶段(Scan 规划)中是持续循环且带状态的:
java
Table table = GetTable.getTable();
ReadBuilder readBuilder = table.newReadBuilder()
.withProjection(projection)
.withFilter(filters);
// ================ 第一阶段:Scan Plan (持续进行) ================
StreamTableScan scan = readBuilder.newStreamScan();
while (true) {
// 持续扫描新的 Snapshot 提交,并增量生成 Splits
List<Split> splits = scan.plan().splits();
// (将新增的 splits 实时分发给 Task)
// 记录 Scan 的 Offset/State,以便进行 Checkpoint 容错恢复
Long state = scan.checkpoint();
// ================ 第二阶段:Read Split (在分布式 Task 异步且持续读取) ================
TableRead read = readBuilder.newRead().executeFilter();
RecordReader<InternalRow> reader = read.createReader(splits);
reader.forEachRemaining(System.out::println);
Thread.sleep(1000);
}
总结
Paimon 通过两阶段读取的设计,最大化地利用了分布式引擎的协调能力:将元数据访问、分区/分桶裁剪、索引过滤收敛在 Scan Plan 阶段,确保不会因为元数据文件的并发访问给文件系统带来过大的压力;而将繁重的文件 I/O、解压反序列化以及 LSM 合并去重等计算密集型工作彻底分散在分布式 Task 的 Read Split 阶段,从而保证了在大规模数据量下的高效读取。