基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

业务不仅要求输出窗口聚合的最终结果,还要求如果在窗口计算完毕后、但仍在 AllowedLateness 范围内有新数据到达时,能够发出更新(Retract/Update)记录。应该怎么做?

知识点图片

在流处理引擎(如 Apache Flink)中,处理带有 AllowedLateness(允许延迟)的窗口时,默认情况下,Flink DataStream API 遇到延迟数据会触发窗口的重新计算,并 再次输出一条新的结果(Append模式),而不是自动发出“撤回(Retract)”和“更新(Update)”记录。

如果业务明确要求输出带有撤回(Retract)和更新(Update)标识的变更流(Changelog Stream),我们需要通过自定义逻辑来实现。

以下是三种常见的解决方案,按推荐程度和适用场景排序:


方法一:DataStream API + 自定义状态(最精准、最契合需求)

如果你的下游需要明确的 [-U, +U](更新前、更新后)指令,你可以通过在 ProcessWindowFunction 中引入状态(State)来记录上一次触发时输出的结果。

核心原理:

  1. 窗口闭合(Watermark 过线)时,触发第一次计算,输出 [+I](插入)。
  2. 将此次结果保存到该窗口的 ValueState 中。
  3. AllowedLateness 期间,如果有迟到数据到达,窗口再次触发。
  4. ValueState 取出旧结果,向下游发出 [-U](撤回旧数据)。
  5. 发出重新计算后的新结果 [+U](输出新数据),并更新 ValueState
  6. AllowedLateness 时间到达,窗口彻底销毁,Flink 会自动清理与该窗口绑定的 ValueState,不会造成状态泄露。

代码示例(Java):

java
// 假设数据结构为 Tuple3<String(Key), Long(Timestamp), Long(Value)>
DataStream<Tuple3<String, Long, Long>> stream = ...;

stream.keyBy(data -> data.f0)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .allowedLateness(Time.minutes(5)) // 允许 5 分钟迟到
    // 使用 AggregateFunction 提前增量聚合,ProcessWindowFunction 负责处理输出和状态
    .aggregate(new MyAggregateFunction(), new RetractProcessWindowFunction())
    .print();

// 自定义 ProcessWindowFunction 生成 Retract 流
public class RetractProcessWindowFunction 
    extends ProcessWindowFunction<Long, Row, String, TimeWindow> {

    // 用于保存上一次输出的结果。注意:这里的 State 是 Window 级别的,会自动随窗口销毁而清理!
    private ValueState<Long> lastResultState;

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
                "last-result", Long.class);
        lastResultState = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void process(String key, Context context, Iterable<Long> elements, Collector<Row> out) throws Exception {
        // 1. 获取当前窗口的最新聚合结果 (因为前面接了 AggregateFunction,这里 elements 只有一条记录)
        Long currentResult = elements.iterator().next();
        
        // 2. 获取上一次输出的结果
        Long lastResult = lastResultState.value();

        if (lastResult == null) {
            // 第一次输出(窗口正常闭合时)
            Row insertRow = Row.withRowKind(RowKind.INSERT); // +I
            insertRow.addField(key);
            insertRow.addField(context.window().getEnd());
            insertRow.addField(currentResult);
            out.collect(insertRow);
        } else {
            // 迟到数据触发的更新
            if (!lastResult.equals(currentResult)) { // 如果结果变了才输出
                // 发出撤回(Retract)记录
                Row retractRow = Row.withRowKind(RowKind.UPDATE_BEFORE); // -U
                retractRow.addField(key);
                retractRow.addField(context.window().getEnd());
                retractRow.addField(lastResult);
                out.collect(retractRow);

                // 发出更新后的记录
                Row updateRow = Row.withRowKind(RowKind.UPDATE_AFTER); // +U
                updateRow.addField(key);
                updateRow.addField(context.window().getEnd());
                updateRow.addField(currentResult);
                out.collect(updateRow);
            }
        }

        // 3. 更新状态
        lastResultState.update(currentResult);
    }
}

方法二:利用下游存储的 Upsert 特性(最实用、工程中最常用)

在实际工程中,我们通常不要求 Flink 强行发出 Retract 流,而是利用下游数据库(如 MySQL、Redis、Elasticsearch、HBase 等)的 Upsert(幂等更新) 能力。

核心原理:

  1. Flink DataStream 默认行为是:迟到数据到达后,直接输出一条最新的计算结果(Append 模式)。
  2. 我们只需要在输出的结果中,包含能够唯一确定该窗口的主键(Primary Key):通常是 Key + WindowEnd
  3. 当数据写入下游数据库时,执行 INSERT ... ON DUPLICATE KEY UPDATE(MySQL)或者直接 PUT(Redis/HBase/ES)。

效果:

  • 窗口正常闭合时:写入 Key="A", Window="10:00", Value=100
  • 迟到数据到达时:Flink 重新计算得出 105,直接输出并覆盖数据库中对应的记录,数据库最终变为 Value=105

优点: 不需要维护额外的 State,不增加 Flink 的计算负担,实现极其简单。


方法三:使用 Flink SQL 的常规 Group By + 开启 MiniBatch(SQL方案)

如果你希望使用 Flink SQL,并且必须输出 Retract 流。需要注意:Flink SQL 的 Window TVF(TUMBLE/HOP 等)目前不支持发射迟到数据的更新(它们是 Append-only 的)。

为了实现类似窗口且带有撤回流的效果,可以通过时间戳取整将其转化为普通的 GROUP BY 聚合,普通聚合在 Flink SQL 中天生就是产生 Retract/Update 流的动态表(Dynamic Table)。

核心原理:
将事件时间向下取整作为“窗口键”,当作普通的 Group By 来做。

SQL 示例:

sql
-- 配置允许数据晚到(State TTL,替代 AllowedLateness 的作用)
SET 'table.exec.state.ttl' = '1 h';

SELECT 
    user_id,
    -- 将 event_time 向下取整到分钟级别,模拟 1 分钟的滚动窗口
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
    COUNT(1) as pv
FROM source_table
GROUP BY 
    user_id,
    TUMBLE_START(event_time, INTERVAL '1' MINUTE);

注意:这不是官方标准的 Window TVF 语法,而是传统的分组聚合。这种写法下,每来一条数据(包括正常数据和迟到数据),Flink 都会输出 -U+U。为了避免更新过于频繁,通常会配合开启 MiniBatch(微批处理)来减少输出量。

总结建议

  1. 如果下游是关系型数据库或 KV 存储:强烈建议使用 方法二(Upsert 模式)。只需要把 Key + 窗口结束时间 作为联合主键即可,这是业内最通用、性能最高的做法。
  2. 如果下游是 Kafka,且消费端严格依赖 [-U, +U] 格式(例如 Debezium 格式):使用 方法一(ProcessWindowFunction + ValueState),这是在 DataStream API 中完美解决此问题的标准范式。
00:00
00:00