基于本文回答

播面 播面

刷题像听歌,多听自然懂
0
评论

讲讲Paimon 的时间旅行(Time Travel)查询?

知识点图片

在 Apache Paimon 的官方文档(master 分支)中,时间旅行(Time Travel)是一项核心的数据湖能力。它允许用户重现历史任意时刻的数据状态,或者从历史的某个特定点开始进行增量消费。

以下是基于 Paimon 最新 master 文档,对 Paimon 时间旅行查询的详细剖析,包括其核心原理、批查询中的时间旅行、流查询中的时间旅行,以及辅助时间旅行的机制(如 Tag 和系统表)。


一、 核心原理

Paimon 的时间旅行依赖于其多版本快照(Snapshot)机制LSM 树存储结构

  1. 快照生成:每次写入提交(Commit)时,Paimon 都会生成一个或两个隔离的快照。
  2. 元数据追踪:每个快照都记录了该版本的 Schema ID 和一份清单列表(Manifest List)索引。清单列表精确追踪了属于该快照的所有物理数据文件。
  3. 隔离读取:在进行时间旅行查询时,读取引擎通过指定的快照 ID 或时间戳,定位到对应的 Manifest List,仅读取该历史时间点的数据文件,而不会受到当前最新数据的影响。

二、 批处理时间旅行(Batch Time Travel)

批处理下的时间旅行可以指定快照 ID(Snapshot ID)时间戳(Timestamp)标签(Tag)水位线(Watermark)来读取对应的数据。不同的计算引擎支持不同的 SQL 语法或动态配置选项。

1. SQL 语法支持

1.1 Flink SQL (Flink 1.18+)

Flink 1.18 及以上版本支持标准的 SQL 时间旅行语法:

sql
-- 通过指定时间戳进行时间旅行查询
SELECT * FROM my_table FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00';

-- 也可以使用简单的时间表达式
SELECT * FROM my_table FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00' + INTERVAL '1' DAY;
1.2 Spark SQL (Spark 3.3+) & Trino

Spark 3.3+ 和 Trino 支持使用 VERSION AS OFTIMESTAMP AS OF 语法进行时间旅行:

sql
-- 1. 根据快照 ID(Snapshot ID)查询
SELECT * FROM my_table VERSION AS OF 1;

-- 2. 根据特定时间戳查询
SELECT * FROM my_table TIMESTAMP AS OF '2023-06-01 00:00:00.123';
SELECT * FROM my_table TIMESTAMP AS OF 1678883047; -- 传入 Unix 秒数

-- 3. 根据指定的 Tag 查询
SELECT * FROM my_table VERSION AS OF 'my-tag';

-- 4. 根据指定的 Watermark 查询(匹配 Watermark 之后的第一个快照)
SELECT * FROM my_table VERSION AS OF 'watermark-1678883047356';

⚠️ 注意(Tag 与 Snapshot 的优先级冲突)
如果一个 Tag 的名称是一个纯数字(例如名为 '1' 的 Tag,对应 Snapshot 2),在使用 VERSION AS OF '1' 语法查询时,Paimon 会优先考虑 Tag。这意味着该查询实际读取的是 Snapshot 2,而不是 Snapshot 1。


2. 动态表选项(OPTIONS)支持

在一些不支持标准时间旅行语法的引擎或老版本中,可以通过 Paimon 的 Dynamic Options(动态表参数) 临时改变查询的行为。

Flink Hint (OPTIONS 语法)
sql
-- 根据快照 ID
SELECT * FROM my_table /*+ OPTIONS('scan.snapshot-id' = '1') */;

-- 根据时间戳(毫秒)
SELECT * FROM my_table /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;

-- 根据时间戳字符串(会自动转换,使用本地时区)
SELECT * FROM my_table /*+ OPTIONS('scan.timestamp' = '2023-12-09 23:09:12') */;

-- 根据 Tag 名称
SELECT * FROM my_table /*+ OPTIONS('scan.tag-name' = 'my-tag') */;

-- 根据 Watermark
SELECT * FROM my_table /*+ OPTIONS('scan.watermark' = '1678883047356') */;
Spark DataFrame API

如果你在 Spark 中使用 Scala/Java 编程接口,可以直接通过 .option() 传入配置:

plaintext
spark.read.format("paimon")
  .option("scan.snapshot-id", 1) // 或者配置 "scan.timestamp-millis"
  .table("my_table")
Hive SQL

在 Hive 中,可以通过 SET 语法在会话级别指定查询的快照:

sql
-- 指定读取快照 1
SET paimon.scan.snapshot-id=1;
SELECT * FROM my_table;

-- 查询结束后记得重置为 null,恢复默认读取最新快照
SET paimon.scan.snapshot-id=null;

三、 流式处理时间旅行(Streaming Time Travel)

时间旅行在 Paimon 中不仅仅适用于批查询(只读取历史某一个静态快照),同样适用于流式查询。通过指定时间旅行参数,流计算任务可以指定历史起点进行增量消费

1. 流式读取模式 (scan.mode)

流式读取时,需要配合 scan.mode 参数来定义流起点的读取行为:

  • from-snapshot:从指定的 scan.snapshot-id 开始连续读取增量变更,在启动时不读取快照本身的存量数据。
  • from-snapshot-full:先读取 scan.snapshot-id 对应快照的完整存量数据,再连续读取此后的增量变更数据。
  • from-timestamp:从指定的 scan.timestamp-millis 对应的时间点开始连续读取增量变更。

四、 时间旅行的局限性与解决方案(快照过期与 Tag)

1. 快照过期带来的问题

由于 Paimon 在持续写入时会不断产生新的快照,为了避免磁盘空间被无限占用,默认情况下 Paimon 写入程序会自动触发快照过期(Snapshot Expiration)

  • 默认配置下,快照只保留 1 小时(参数:snapshot.time-retained = 1 h)。
  • 一旦历史快照被过期清理,其对应的数据文件也可能会被物理删除,导致针对该时间点的时间旅行查询因找不到数据而失败。

2. 解决方案一:使用 Tag 锁定历史

为了能够对重要时间点(如每日天级分区完成、大促活动结束等)的数据进行长期的历史追溯,Paimon 提供了 Tag(标签) 功能。

  • 特性:Tag 是指向某个快照的命名指针,被 Tag 关联的快照及其数据文件永远不会被自动过期清理
  • 自动创建 Tag:可以通过在 TBLPROPERTIES 中配置自动创建机制。
    sql
    CREATE TABLE my_table (
      id INT PRIMARY KEY NOT ENFORCED,
      name STRING
    ) WITH (
      'tag.automatic-creation' = 'process-time', -- 基于处理时间创建
      'tag.creation-period' = 'daily',           -- 每天创建一个
      'tag.num-retained-max' = '90'               -- 保留 90 天的 Tag
    );

3. 解决方案二:文件创建时间粗略过滤 (scan.file-creation-time-millis)

如果某些历史快照已经不幸过期,但底层物理文件可能还没有被真正物理删除,流式查询还可以利用粗略的文件创建时间过滤:

  • 通过配置 scan.file-creation-time-millis,Paimon 会进行粗略的过滤,只保留在该毫秒时间戳之后生成的数据文件进行消费。

五、 辅助元数据查询(系统表)

在执行时间旅行前,如果不知道具体的快照 ID 或时间,可以利用 Paimon 提供的系统表(System Tables)进行元数据查询:

  1. 查询快照历史信息(包括提交时间、快照 ID 等):
    sql
    SELECT snapshot_id, schema_id, commit_user, commit_time 
    FROM my_table$snapshots;
  2. 查询历史 Schema 结构
    sql
    SELECT schema_id, fields, comment 
    FROM my_table$schemas;
00:00
00:00