对于新上架的商品,当30分钟内没有下单动作时则告警出来,使用flink如何实现?
本文讲解如何用Flink监控“事件A发生后,事件B在规定时间内未发生”的场景,并提供了KeyedProcessFunction、CEP及SQL三种实现方案。
你这个需求本质上是 “监控某个事件(商品上架),在一段时间窗口(30分钟)内如果未发生另一个事件(下单),就触发告警”。
在 Flink 里比较典型的实现方式有两种:定时器(ProcessFunction/KeyedProcessFunction) 或者 CEP(复杂事件处理)。
✅ 方案一:KeyedProcessFunction + 定时器(推荐)
思路:
- 每当有 商品上架事件 到来时,注册一个 30分钟之后的定时器。
- 如果在这30分钟内收到了 下单事件,就清除定时器,说明不需要告警。
- 如果定时器触发时还没有下单,就输出一条告警。
示例代码(Scala/Pseudo-Java):
java
public class ProductMonitor extends KeyedProcessFunction<String, Event, String> {
private transient ValueState<Boolean> hasOrdered;
@Override
public void open(Configuration parameters) {
hasOrdered = getRuntimeContext().getState(
new ValueStateDescriptor<>("hasOrdered", Boolean.class)
);
}
@Override
public void processElement(Event event, Context ctx, Collector<String> out) throws Exception {
if (event.type.equals("上架")) {
// 注册 30分钟后的定时器
long triggerTime = ctx.timerService().currentProcessingTime() + 30 * 60 * 1000L;
ctx.timerService().registerProcessingTimeTimer(triggerTime);
hasOrdered.update(false);
} else if (event.type.equals("下单")) {
// 收到下单则标记
hasOrdered.update(true);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
Boolean ordered = hasOrdered.value();
if (ordered == null || !ordered) {
out.collect("商品 " + ctx.getCurrentKey() + " 上架30分钟无订单,告警!");
}
}
}
要点:
- 需要保证事件有 唯一的商品ID,所以要
keyBy(productId)。 - 这里用了 ProcessingTime(处理时间),如果是基于事件时间,还要结合 watermark。
✅ 方案二:Flink CEP
用 CEP 表达“上架后30分钟内没有下单”这种 缺失事件模式。
CEP 支持 notFollowedBy 语法,定义模式:
- 上架事件
- notFollowedBy 下单事件 within 30分钟
大致写法:
java
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(e -> e.type.equals("上架"))
.notFollowedBy("order")
.where(e -> e.type.equals("下单"))
.within(Time.minutes(30));
PatternStream<Event> patternStream = CEP.pattern(stream.keyBy(e -> e.productId), pattern);
patternStream.select((pattern, timestamp) -> {
Event start = pattern.get("start").iterator().next();
return "商品 " + start.productId + " 上架30分钟无订单,告警!";
});
CEP 的优势是表达力强,但实现和调优会更复杂;对于单一“超时未发生”这种场景,ProcessFunction + Timer 更直观。
✅ 总结
- 如果只是 单一事件超时监控,推荐用 KeyedProcessFunction + Timer,简单高效。
- 如果后续可能要扩展成更复杂的 事件模式检测(比如上架 → 加购 → 未下单),可以用 CEP。
要不要我帮你写一个完整的 Flink SQL 方案(比如用 MATCH_RECOGNIZE)?这样在 SQL 层也能直接表达“30分钟内无下单”。
✅ 方案三: Flink SQL 的方案,用 MATCH_RECOGNIZE 来实现
🟢 思路
MATCH_RECOGNIZE是 Flink SQL 提供的事件流模式匹配功能,类似 CEP 的 SQL 化。我们定义一个模式:
- start:商品上架事件
- noOrder:30分钟内没有下单事件
AFTER MATCH SKIP TO NEXT ROW保证每个上架只触发一次匹配。
🟢 示例表结构
假设我们有一个事件表:
sql
CREATE TABLE events (
product_id STRING,
event_type STRING, -- 取值 "上架" 或 "下单"
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'product-events',
'format' = 'json',
'properties.bootstrap.servers' = 'xxx:9092'
);
🟢 使用 MATCH_RECOGNIZE
sql
SELECT *
FROM events
MATCH_RECOGNIZE (
PARTITION BY product_id
ORDER BY ts
MEASURES
start.product_id AS product_id,
start.ts AS up_time,
CURRENT_TIMESTAMP AS alarm_time
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
PATTERN (start NOT_FOLLOWED_BY order WITHIN INTERVAL '30' MINUTE)
DEFINE
start AS start.event_type = '上架',
order AS order.event_type = '下单'
) MR;
🟢 关键点解释
PARTITION BY product_id:每个商品独立监控。PATTERN (start NOT_FOLLOWED_BY order WITHIN INTERVAL '30' MINUTE'):匹配“上架后30分钟内没有下单”。MEASURES:定义告警时输出的字段。AFTER MATCH SKIP TO NEXT ROW:避免同一个事件重复触发告警。
🟢 输出结果示例
当商品 A123 在 10:00:00 上架,30分钟内没有下单,结果表中会出现一行:
| product_id | up_time | alarm_time |
|---|---|---|
| A123 | 2025-08-18 10:00:00 | 2025-08-18 10:30:01 |
右滑查看面试常问