业务场景为“用户的曝光事件需要在 15 分钟内匹配到点击事件才算转化成功”,如何利用 Flink SQL 的 Interval Join 来实现?
在 Flink SQL 中,Interval Join(区间关联) 是解决此类“基于时间窗口的事件归因/匹配”场景的最完美方案。
Interval Join 专门用于将两条流中事件时间(Event Time)在限定时间范围内的相关数据进行 Join。它的底层会自动管理 State 的生命周期,避免状态无限膨胀。
以下是实现该业务场景的完整步骤和代码示例:
1. 核心逻辑解析
业务诉求:“曝光后 15 分钟内点击”。
转换为 Flink SQL Interval Join 的时间条件:click_time BETWEEN imp_time AND imp_time + INTERVAL '15' MINUTE
2. DDL 建表定义 (定义时间属性与 Watermark)
在使用 Interval Join 之前,必须在两条流的表中明确指定事件时间字段和Watermark(水位线)。
sql
-- 1. 创建曝光流表 (Impression)
CREATE TABLE impression_table (
user_id STRING,
ad_id STRING,
imp_time TIMESTAMP(3), -- 曝光时间
-- 定义 Watermark,允许 5 秒的乱序
WATERMARK FOR imp_time AS imp_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
...
);
-- 2. 创建点击流表 (Click)
CREATE TABLE click_table (
user_id STRING,
ad_id STRING,
click_time TIMESTAMP(3), -- 点击时间
-- 定义 Watermark,允许 5 秒的乱序
WATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
...
);
3. DML 编写 Interval Join 语句
使用 INNER JOIN 结合 BETWEEN ... AND ... 语法来实现转化成功的匹配。
sql
-- 3. 匹配成功的结果输出
SELECT
i.user_id,
i.ad_id,
i.imp_time,
c.click_time,
-- 可选:计算从曝光到点击实际花费的秒数
TIMESTAMPDIFF(SECOND, i.imp_time, c.click_time) as cost_seconds
FROM impression_table i
JOIN click_table c
-- 业务匹配条件:同用户且同广告
ON i.user_id = c.user_id
AND i.ad_id = c.ad_id
-- 时间边界条件:点击时间在曝光时间之后的 0 到 15 分钟之间
AND c.click_time BETWEEN i.imp_time AND i.imp_time + INTERVAL '15' MINUTE;
4. 进阶场景补充(实战经验)
场景 A:如果用户在 15 分钟内点击了多次怎么办?
上述 INNER JOIN 会输出多条转化成功的记录(一对多)。如果业务要求“一次曝光最多只算一次有效点击”,可以在外层包一个去重逻辑 (Deduplication),保留第一条:
sql
SELECT * FROM (
SELECT
user_id, ad_id, imp_time, click_time,
ROW_NUMBER() OVER (PARTITION BY user_id, ad_id, imp_time ORDER BY click_time ASC) as rn
FROM (
-- 上面的 Interval Join 语句
)
) WHERE rn = 1;
场景 B:我想知道哪些曝光“没有”转化成功怎么办?
如果不仅要成功的,还要统计失败的(未点击),只需将 JOIN 改为 LEFT JOIN 即可。此时,如果 15 分钟后依然没有点击,Flink 会输出一条 click 字段全为 NULL 的数据。
sql
SELECT
i.user_id,
i.ad_id,
i.imp_time,
c.click_time,
IF(c.click_time IS NOT NULL, '转化成功', '转化失败') AS status
FROM impression_table i
LEFT JOIN click_table c -- 使用 LEFT JOIN
ON i.user_id = c.user_id
AND i.ad_id = c.ad_id
AND c.click_time BETWEEN i.imp_time AND i.imp_time + INTERVAL '15' MINUTE;
5. 底层运行机制与性能优势
为什么推荐用 Interval Join 而不是普通的 Window Join 或基于 State 的普通 Join?
- 精准的时间边界:它不是基于固定的窗口(如整点到整点),而是以每条曝光数据自身的
imp_time为基准向后延伸 15 分钟,完全契合真实业务场景。 - 状态自动清理:Flink 底层会根据你写的
INTERVAL '15' MINUTE和流的 Watermark,自动清理已经过期超过 15 分钟的曝光数据状态。你不需要手动设置 State TTL,不用担心内存溢出(OOM)。