事实流需要关联一张在 MySQL 里不断发生 Update 甚至 Delete 的维表,如何利用 Flink SQL 的 Lookup Join 或者 Temporal Table Join 确保拿到的是“事件发生那个时刻”的维表快照数据?
这是一个非常经典且核心的流处理场景。
要确保拿到“事件发生那个时刻”的维表快照数据(即严格的 Event-Time 语义,且能处理 Update 和 Delete),标准的 Lookup Join 是无法做到的,必须使用基于 Flink CDC 的 Event-Time Temporal Table Join(事件时间时态表关联)。
下面详细解释为什么,以及具体如何实现。
核心结论对比
- Lookup Join (外部查找联接)
- 机制:基于处理时间 (Processing Time)。当事实流的一条数据到达 Flink 时,Flink 发起一个 RPC 请求(如通过 JDBC)去查 MySQL 的当前最新状态。
- 致命缺陷:如果事实流发生延迟,或者消费堆积,当数据被处理去查 MySQL 时,MySQL 里的维表可能已经被 Update 甚至 Delete 了。你拿到的是“处理时刻”的数据,而不是“事件发生时刻”的数据。
- Temporal Table Join (时态表联接)
- 机制:基于事件时间 (Event Time)。利用 Flink CDC 实时抽取 MySQL 的 Binlog 形成 Changelog 流,并在 Flink 内部状态中维护维表的多版本快照 (Versioned Table)。
- 完美契合:当事实流数据到来时,Flink 会根据事实流的
event_time,去内部状态中精确匹配那个时间点对应的维表版本。
完整实现方案:基于 Flink CDC 的 Temporal Table Join
要实现精确到“事件发生时刻”的关联,你需要按以下步骤编写 Flink SQL:
步骤 1:定义事实流表 (Kafka)
必须定义水位线(Watermark)和事件时间(Event Time)。
CREATE TABLE fact_stream (
fact_id BIGINT,
dim_id BIGINT,
amount DECIMAL(10, 2),
event_time TIMESTAMP(3), -- 事件发生时间
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND -- 定义水位线
) WITH (
'connector' = 'kafka',
'topic' = 'fact_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
步骤 2:定义动态维表 (MySQL CDC)
必须使用 mysql-cdc 连接器,而不是普通的 jdbc。同时必须定义主键(PRIMARY KEY)和事件时间(Watermark),这两个条件是构成 Versioned Table 的必须项。
CREATE TABLE dim_mysql_cdc (
id BIGINT,
name STRING,
status STRING,
update_time TIMESTAMP(3), -- MySQL中记录的更新/创建时间
PRIMARY KEY (id) NOT ENFORCED, -- 必须有主键
WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND -- 必须有水位线
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'pwd',
'database-name' = 'mydb',
'table-name' = 'dim_table'
);
步骤 3:执行 Temporal Table Join
使用 FOR SYSTEM_TIME AS OF 语法,并且时间点必须是事实流的事件时间。
SELECT
f.fact_id,
f.amount,
f.event_time,
d.name,
d.status
FROM fact_stream AS f
LEFT JOIN dim_mysql_cdc FOR SYSTEM_TIME AS OF f.event_time AS d
ON f.dim_id = d.id;
这个方案是如何处理 Update 和 Delete 的?
Flink CDC 会解析 MySQL 的 Binlog:
- 对于 Update:CDC 会向下游发送
-U(旧数据) 和+U(新数据)。Flink 会在内部状态中保存这个主键的新版本及其对应的update_time。 - 对于 Delete:CDC 会发送
-D(删除) 消息。Flink 会在状态中标记这个主键在对应的update_time之后已删除。
情景推演(完美实现你的需求):
假设 MySQL 维表 ID=1 的数据在 10:00:00 创建,10:05:00 Update 了字段,10:10:00 被 Delete。
- 事实流来了一条
event_time = 10:03:00的数据,关联时拿到的是 Create 后的快照。 - 事实流来了一条
event_time = 10:07:00的数据,关联时拿到的是 Update 后的快照。 - 事实流来了一条
event_time = 10:11:00的数据,关联时拿到的是 NULL(因为已经被 Delete 了)。
(即使事实流因为故障堆积,在 11:00:00 才开始处理这三条数据,关联的结果也绝对不会变,因为 Flink 依靠 Event Time Watermark 对齐)
生产环境必须要关注的 2 个坑
虽然 Temporal Join 完美解决了业务逻辑问题,但它引入了状态管理问题:
1. 状态膨胀问题 (State TTL)
由于 Flink 需要在内部维护维表的历史版本快照以便关联,如果维表很大或者一直运行,Flink 的 State 会无限膨胀导致 OOM 或 Checkpoint 失败。
解决办法:设置表级别的 State TTL。你需要评估事实流最大可能延迟多久(例如最多延迟 24 小时),然后设置 TTL。
-- 在提交 SQL 任务前设置参数,例如保留 24 小时的状态
SET 'table.exec.state.ttl' = '24 h';
注意:设置 24h 意味着如果事实流的数据迟到了 25 小时,它将无法关联到当时的维表版本。
2. Watermark 对齐问题
Temporal Join 强依赖两个流的 Watermark。如果 MySQL CDC 的流一直没有数据变更,它的 Watermark 就不会推进,这会导致 Join 算子卡住,不向下游输出数据。
解决办法:在 CDC 表配置中开启空闲超时(Idle Timeout),让 Flink 知道即使没有数据,时间也在流逝。
-- 配置 Watermark 空闲超时机制
SET 'table.exec.source.idle-timeout' = '10 s';