讲讲Paimon 的时间旅行(Time Travel)查询?
在 Apache Paimon 的官方文档(master 分支)中,时间旅行(Time Travel)是一项核心的数据湖能力。它允许用户重现历史任意时刻的数据状态,或者从历史的某个特定点开始进行增量消费。
以下是基于 Paimon 最新 master 文档,对 Paimon 时间旅行查询的详细剖析,包括其核心原理、批查询中的时间旅行、流查询中的时间旅行,以及辅助时间旅行的机制(如 Tag 和系统表)。
一、 核心原理
Paimon 的时间旅行依赖于其多版本快照(Snapshot)机制与 LSM 树存储结构:
- 快照生成:每次写入提交(Commit)时,Paimon 都会生成一个或两个隔离的快照。
- 元数据追踪:每个快照都记录了该版本的 Schema ID 和一份清单列表(Manifest List)索引。清单列表精确追踪了属于该快照的所有物理数据文件。
- 隔离读取:在进行时间旅行查询时,读取引擎通过指定的快照 ID 或时间戳,定位到对应的 Manifest List,仅读取该历史时间点的数据文件,而不会受到当前最新数据的影响。
二、 批处理时间旅行(Batch Time Travel)
批处理下的时间旅行可以指定快照 ID(Snapshot ID)、时间戳(Timestamp)、标签(Tag)或水位线(Watermark)来读取对应的数据。不同的计算引擎支持不同的 SQL 语法或动态配置选项。
1. SQL 语法支持
1.1 Flink SQL (Flink 1.18+)
Flink 1.18 及以上版本支持标准的 SQL 时间旅行语法:
-- 通过指定时间戳进行时间旅行查询
SELECT * FROM my_table FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00';
-- 也可以使用简单的时间表达式
SELECT * FROM my_table FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00' + INTERVAL '1' DAY;
1.2 Spark SQL (Spark 3.3+) & Trino
Spark 3.3+ 和 Trino 支持使用 VERSION AS OF 和 TIMESTAMP AS OF 语法进行时间旅行:
-- 1. 根据快照 ID(Snapshot ID)查询
SELECT * FROM my_table VERSION AS OF 1;
-- 2. 根据特定时间戳查询
SELECT * FROM my_table TIMESTAMP AS OF '2023-06-01 00:00:00.123';
SELECT * FROM my_table TIMESTAMP AS OF 1678883047; -- 传入 Unix 秒数
-- 3. 根据指定的 Tag 查询
SELECT * FROM my_table VERSION AS OF 'my-tag';
-- 4. 根据指定的 Watermark 查询(匹配 Watermark 之后的第一个快照)
SELECT * FROM my_table VERSION AS OF 'watermark-1678883047356';
⚠️ 注意(Tag 与 Snapshot 的优先级冲突):
如果一个 Tag 的名称是一个纯数字(例如名为'1'的 Tag,对应 Snapshot 2),在使用VERSION AS OF '1'语法查询时,Paimon 会优先考虑 Tag。这意味着该查询实际读取的是 Snapshot 2,而不是 Snapshot 1。
2. 动态表选项(OPTIONS)支持
在一些不支持标准时间旅行语法的引擎或老版本中,可以通过 Paimon 的 Dynamic Options(动态表参数) 临时改变查询的行为。
Flink Hint (OPTIONS 语法)
-- 根据快照 ID
SELECT * FROM my_table /*+ OPTIONS('scan.snapshot-id' = '1') */;
-- 根据时间戳(毫秒)
SELECT * FROM my_table /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
-- 根据时间戳字符串(会自动转换,使用本地时区)
SELECT * FROM my_table /*+ OPTIONS('scan.timestamp' = '2023-12-09 23:09:12') */;
-- 根据 Tag 名称
SELECT * FROM my_table /*+ OPTIONS('scan.tag-name' = 'my-tag') */;
-- 根据 Watermark
SELECT * FROM my_table /*+ OPTIONS('scan.watermark' = '1678883047356') */;
Spark DataFrame API
如果你在 Spark 中使用 Scala/Java 编程接口,可以直接通过 .option() 传入配置:
spark.read.format("paimon")
.option("scan.snapshot-id", 1) // 或者配置 "scan.timestamp-millis"
.table("my_table")
Hive SQL
在 Hive 中,可以通过 SET 语法在会话级别指定查询的快照:
-- 指定读取快照 1
SET paimon.scan.snapshot-id=1;
SELECT * FROM my_table;
-- 查询结束后记得重置为 null,恢复默认读取最新快照
SET paimon.scan.snapshot-id=null;
三、 流式处理时间旅行(Streaming Time Travel)
时间旅行在 Paimon 中不仅仅适用于批查询(只读取历史某一个静态快照),同样适用于流式查询。通过指定时间旅行参数,流计算任务可以指定历史起点进行增量消费。
1. 流式读取模式 (scan.mode)
流式读取时,需要配合 scan.mode 参数来定义流起点的读取行为:
from-snapshot:从指定的scan.snapshot-id开始连续读取增量变更,在启动时不读取快照本身的存量数据。from-snapshot-full:先读取scan.snapshot-id对应快照的完整存量数据,再连续读取此后的增量变更数据。from-timestamp:从指定的scan.timestamp-millis对应的时间点开始连续读取增量变更。
四、 时间旅行的局限性与解决方案(快照过期与 Tag)
1. 快照过期带来的问题
由于 Paimon 在持续写入时会不断产生新的快照,为了避免磁盘空间被无限占用,默认情况下 Paimon 写入程序会自动触发快照过期(Snapshot Expiration)。
- 默认配置下,快照只保留 1 小时(参数:
snapshot.time-retained = 1 h)。 - 一旦历史快照被过期清理,其对应的数据文件也可能会被物理删除,导致针对该时间点的时间旅行查询因找不到数据而失败。
2. 解决方案一:使用 Tag 锁定历史
为了能够对重要时间点(如每日天级分区完成、大促活动结束等)的数据进行长期的历史追溯,Paimon 提供了 Tag(标签) 功能。
- 特性:Tag 是指向某个快照的命名指针,被 Tag 关联的快照及其数据文件永远不会被自动过期清理。
- 自动创建 Tag:可以通过在 TBLPROPERTIES 中配置自动创建机制。sql
CREATE TABLE my_table ( id INT PRIMARY KEY NOT ENFORCED, name STRING ) WITH ( 'tag.automatic-creation' = 'process-time', -- 基于处理时间创建 'tag.creation-period' = 'daily', -- 每天创建一个 'tag.num-retained-max' = '90' -- 保留 90 天的 Tag );
3. 解决方案二:文件创建时间粗略过滤 (scan.file-creation-time-millis)
如果某些历史快照已经不幸过期,但底层物理文件可能还没有被真正物理删除,流式查询还可以利用粗略的文件创建时间过滤:
- 通过配置
scan.file-creation-time-millis,Paimon 会进行粗略的过滤,只保留在该毫秒时间戳之后生成的数据文件进行消费。
五、 辅助元数据查询(系统表)
在执行时间旅行前,如果不知道具体的快照 ID 或时间,可以利用 Paimon 提供的系统表(System Tables)进行元数据查询:
- 查询快照历史信息(包括提交时间、快照 ID 等):sql
SELECT snapshot_id, schema_id, commit_user, commit_time FROM my_table$snapshots; - 查询历史 Schema 结构:sql
SELECT schema_id, fields, comment FROM my_table$schemas;