基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

使用 Flink SQL 时,对比使用 DISTINCT 关键字去重和使用 ROW_NUMBER() 筛选第一条去重(Deduplication),它们在状态保留策略和性能上有什么区别?

知识点图片

在 Flink SQL 中,数据去重是一个非常常见的需求。虽然 DISTINCTROW_NUMBER() OVER(...) WHERE rn = 1(保留第一条)在批处理中可能达到类似的效果,但在流处理(Streaming)中,由于底层实现和优化机制完全不同,它们在状态保留策略性能上有着显著的区别。

总的来说,Flink 官方强烈推荐使用 ROW_NUMBER() 语法进行去重(Deduplication),因为它经过了专门的算子优化。

以下是它们在状态保留策略和性能上的详细对比:


一、 状态保留策略(State Retention)对比

在流处理中,为了识别重复数据,Flink 必须在 State(状态)中记住以前见过的数据。如何清理这些状态是两者的核心差异之一。

1. DISTINCT 的状态保留

  • 清理机制完全依赖 State TTL(状态生存时间)
  • 原理DISTINCT 本质上是一个聚合(Group Aggregation)操作。Flink 无法预知未来的数据流中是否还会出现相同的数据,因此它不能主动清理状态
  • 缺点:你必须配置 table.exec.state.ttl
    • 如果 TTL 设置得太短,晚到的重复数据会被当成新数据输出,导致去重不彻底
    • 如果 TTL 设置得太长,State 会持续膨胀,可能导致 OOM 或磁盘被打满。

2. ROW_NUMBER()(保留第一条)的状态保留

Flink SQL 优化器能够识别 ROW_NUMBER() = 1 这种特定的 Top-N 语法,并将其优化为专门的 Deduplication 算子。它的状态清理取决于你按什么时间属性排序(ORDER BY):

  • 按处理时间(Processing Time / Proctime)排序:
    • 清理机制依赖 State TTL
    • 与 DISTINCT 类似,处理时间没有事件时间推进的概念,必须依靠配置的 TTL 来清理状态。
  • 按事件时间(Event Time / Rowtime)排序:
    • 清理机制依赖 Watermark(水位线)自动清理
    • 这是最推荐的方式。当 Watermark 越过当前数据的 Event Time + Allowed Lateness 时,Flink 知道不会再有更早的数据到来了,会自动、精准地清理该时间窗口内的状态
    • 优点:不需要依赖全局的 State TTL,状态管理更加安全、可控。

二、 性能(Performance)对比

ROW_NUMBER() 经过 Flink 专门的 Deduplication 优化后,在性能上对 DISTINCT 形成了碾压优势。

1. 状态大小(State Size)

  • DISTINCT:状态中需要保存去重字段的完整值。如果是 SELECT DISTINCT *,则需要把整行数据作为 Key 存入状态,状态体积非常大。
  • ROW_NUMBER() (FirstRow):由于是保留第一条,Flink 只需要知道“这个 Key 是否已经来过”。因此,在底层实现中,Flink 只需要对每个 Partition Key 存储一个极小的标记(例如一个 Boolean 值或空的 ValueState)。状态体积被压缩到了极致。

2. 产生的消息流类型(Changelog Stream)

这是影响下游算子性能的关键因素。

  • DISTINCT:通常会产生 撤回流(Retract Stream)。如果上游是 CDC 等包含 Update 的数据源,DISTINCT 会向下游发送大量的 -U+U(或 -D, +I)消息。下游算子处理撤回消息的代价是非常高的。
  • ROW_NUMBER() (FirstRow):因为业务逻辑是“只保留到达的第一条数据”,所以一旦某个 Key 的第一条数据被输出后,后续相同的 Key 到来会被直接丢弃。因此,它只会向下游发送 INSERT (+I) 消息,绝对不会产生撤回消息(Update/Delete)。这极大减轻了下游算子(如 Sink)的压力。

3. Checkpoint 开销

  • DISTINCT:由于状态体积大,Checkpoint 时需要持久化到 HDFS/OSS 的数据量大,耗时较长,容易造成反压。
  • ROW_NUMBER():状态仅为一个轻量级标记,State 极小,Checkpoint 速度极快,对业务处理几乎无影响。

三、 总结与建议

维度 DISTINCT ROW_NUMBER() WHERE rn = 1 (Deduplication)
底层算子 Group Aggregation Deduplication (专门优化)
状态存储内容 去重的全量字段 仅一个 Boolean/Empty 标记 (极小)
状态清理机制 强依赖 State TTL Proctime依赖TTL / EventTime依赖Watermark自动清理
向下游发送的消息 可能包含 Update/Retract,下游压力大 仅有 INSERT (+I),下游压力极小
Checkpoint 压力 极小
适用场景 简单的全量数据去重统计 (流处理中不推荐) 流处理中去重的标准/最佳实践

代码示例(最佳实践)

如果你想按 user_id 去重,只保留用户的第一条行为记录:

❌ 不推荐写法(DISTINCT)

sql
-- 性能差,状态大
SELECT DISTINCT user_id, user_name, behavior
FROM user_log;

✅ 推荐写法(ROW_NUMBER 基于处理时间)

sql
-- 性能极好,状态极小,只发 INSERT
SELECT user_id, user_name, behavior
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY proctime ASC) as rn
  FROM user_log
) WHERE rn = 1;

结论:在 Flink SQL 流处理中进行去重,请永远优先考虑使用 ROW_NUMBER() OVER(...) WHERE rn = 1 语法。

00:00
00:00