Flink SQL Interval Join实现CTR计算
本文讲解如何使用Flink SQL的Interval Join功能,将曝光流与点击流进行实时关联,以计算点击率。
实现这个需求的核心 Flink SQL 功能是 时间区间联接(Interval Join)。
什么是 Interval Join?
Interval Join 是一种特殊的流式 Join,它会根据时间属性将两个流中时间戳相近的记录关联起来。你可以定义一个时间边界(上界和下界),只有当一个流的记录的时间戳落在了另一个流记录的时间戳的这个指定区间内时,它们才会被关联。
这完美地契合了“曝光后一段时间内等待点击”的场景。
使用 Flink SQL 的实现步骤
假设我们有两个 Kafka Topic:impressions_topic 和 clicks_topic。
第一步:创建源表 (Source Tables)
我们需要创建两个表来分别代表曝光流和点击流。关键在于:
- 指定事件时间属性 (
event_time)。 - 定义 Watermark 策略,告诉 Flink 如何处理乱序数据。
sql
-- 设置 Flink SQL 的执行环境
SET 'table.exec.source.idle-timeout' = '1s'; -- 确保 watermark 能在没有新数据时也能推进
CREATE TABLE impressions (
request_id STRING,
ad_id STRING,
user_id STRING,
impression_time TIMESTAMP(3), -- 事件时间戳,精确到毫秒
-- 定义 'impression_time' 为事件时间属性
WATERMARK FOR impression_time AS impression_time - INTERVAL '3' SECOND -- 允许3秒的乱序
) WITH (
'connector' = 'kafka',
'topic' = 'impressions_topic',
'properties.bootstrap.servers' = 'kafka-broker:9092',
'properties.group.id' = 'ctr-job-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE clicks (
request_id STRING,
click_time TIMESTAMP(3), -- 事件时间戳
-- 定义 'click_time' 为事件时间属性
WATERMARK FOR click_time AS click_time - INTERVAL '3' SECOND -- 允许3秒的乱序
) WITH (
'connector' = 'kafka',
'topic' = 'clicks_topic',
'properties.bootstrap.servers' = 'kafka-broker:9092',
'properties.group.id' = 'ctr-job-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
第二步:编写 Interval Join 查询
现在我们可以使用 LEFT JOIN 来关联这两个流。我们使用 LEFT JOIN 而不是 INNER JOIN 的原因是:
INNER JOIN只会输出成功关联的记录(即有曝光也有点击)。LEFT JOIN则可以输出所有曝光记录,无论它是否有关联到的点击。这样我们既能统计到点击,也能统计到总曝光,从而计算CTR。
sql
SELECT
imp.request_id,
imp.ad_id,
imp.user_id,
imp.impression_time,
clk.click_time,
-- 如果 click_time 不为 NULL,则表示有点击,记为 1,否则为 0
CASE WHEN clk.request_id IS NOT NULL THEN 1 ELSE 0 END AS is_clicked
FROM
impressions AS imp
LEFT JOIN
clicks AS clk
ON
-- 1. 关联键必须相等
imp.request_id = clk.request_id
AND
-- 2. 时间区间约束:点击时间必须在曝光时间之后,且在曝光后的5分钟之内
clk.click_time BETWEEN imp.impression_time AND imp.impression_time + INTERVAL '5' MINUTE;
查询逻辑解释:
imp.request_id = clk.request_id:这是我们的主关联键,确保是同一次广告请求。clk.click_time BETWEEN imp.impression_time AND imp.impression_time + INTERVAL '5' MINUTE:这是 Interval Join 的核心。它定义了一个动态的时间窗口:- 对于每一条曝光记录
imp,Flink 会在状态中“记住”它。 - 然后它会等待
clicks流中request_id相同,并且click_time在[impression_time, impression_time + 5分钟]这个区间内的记录。 - 如果在这个时间窗口内等到了点击,就会生成一条关联成功的数据(所有字段都有值)。
- 如果曝光事件的
impression_time加上5分钟的窗口已经过去了(由 Watermark 决定),但仍然没有等到对应的点击,Flink 就会生成一条关联失败的数据(imp的字段有值,clk的字段为NULL)。
- 对于每一条曝光记录
这完美地用一条 SQL 语句替代了 KeyedCoProcessFunction 中复杂的状态和定时器管理逻辑。
第三步:下游聚合计算CTR
上面的查询产生了一个包含关联结果的中间流。我们可以直接在这个结果之上进行窗口聚合来计算CTR。
sql
-- 创建一个视图,方便后续使用
CREATE VIEW joined_stream AS
SELECT
imp.request_id,
imp.ad_id,
imp.user_id,
imp.impression_time,
-- 如果 click_time 不为 NULL,则表示有点击,记为 1,否则为 0
CASE WHEN clk.request_id IS NOT NULL THEN 1 ELSE 0 END AS is_clicked
FROM
impressions AS imp
LEFT JOIN
clicks AS clk
ON
imp.request_id = clk.request_id
AND
clk.click_time BETWEEN imp.impression_time AND imp.impression_time + INTERVAL '5' MINUTE;
-- 聚合计算每分钟的CTR
SELECT
ad_id,
-- 使用 TUMBLE 窗口函数定义1分钟的滚动窗口
TUMBLE_START(impression_time, INTERVAL '1' MINUTE) AS window_start,
COUNT(*) AS impression_count, -- 总曝光数
SUM(is_clicked) AS click_count, -- 总点击数
CAST(SUM(is_clicked) AS DOUBLE) / COUNT(*) AS ctr
FROM
joined_stream
GROUP BY
ad_id,
TUMBLE(impression_time, INTERVAL '1' MINUTE);
这个聚合查询会按广告ID(ad_id)和1分钟的时间窗口进行分组,然后计算每个窗口内的总曝光、总点击和CTR。
对比 DataStream API 和 Flink SQL
| 特性 | Flink SQL (Interval Join) | DataStream API (KeyedCoProcessFunction) |
|---|---|---|
| 开发效率 | 极高。代码简洁,符合SQL思维,开发和维护成本低。 | 较低。需要手动管理状态、定时器、数据结构,代码量大。 |
| 表达能力 | 受限。对于标准的时间区间关联非常强大,但难以实现复杂的自定义逻辑(如处理超晚数据到侧输出流)。 | 极强。可以完全控制状态和时间,实现任何复杂的业务逻辑,如自定义触发器、侧输出流等。 |
| 性能优化 | Flink 的查询优化器会自动优化执行计划。 | 需要开发者手动进行优化,对开发者的经验要求高。 |
| 易用性 | 非常高。数据分析师和不懂Java/Scala的工程师也能使用。 | 较低。需要专业的流处理开发经验。 |
总结
对于“曝光点击关联”这个经典场景,纯 Flink SQL 是首选方案。它不仅能够完美实现需求,而且代码量少、可读性强、易于维护。
只有在遇到以下情况时,你才需要考虑回退到更底层的 DataStream API:
- 极端复杂的关联逻辑:例如,关联条件不是简单的时间区间,或者需要根据某些事件动态改变等待窗口。
- 需要精细的“超时”事件处理:如果你需要明确捕获“某次曝光因超时而未关联成功”这个事件,并将其发送到特定的流(如告警或监控系统),DataStream API 的
onTimer提供了最直接的实现方式。 - 需要对超晚数据进行特殊处理:使用侧输出流(Side Output)来捕获并处理那些超出关联时间窗口的“迟到”数据,SQL对此的支持有限。
在绝大多数情况下,Flink SQL 的 LEFT INTERVAL JOIN 已经足够强大和高效。