基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

Flink遇到数据倾斜(Data Skew)该如何处理?(Key 散列、两阶段聚合等)

知识点图片

Flink 中的数据倾斜(Data Skew)通常表现为部分 Subtask 处理的数据量远超其他 Subtask,导致整个作业的进度被这几个“热点”拖慢,出现反压(Backpressure)、Checkpoint 超时甚至 OOM(内存溢出)。

处理数据倾斜通常需要结合业务场景(是聚合还是 Join?)和API 类型(DataStream 还是 SQL/Table)来选择方案。以下是系统性的处理策略:


一、 诊断:如何确认发生了数据倾斜?

在动手解决前,先通过 Flink Web UI 确认:

  1. Subtask 层面:查看某个算子(Operator)的各个 Subtask 的 Records ReceivedRecords Sent。如果有的几千万,有的只有几百,那就是倾斜。
  2. 反压情况:查看 Backpressure 面板,倾斜节点的下游通常反压为 High,而其他并行度正常。
  3. Checkpoint:Checkpoint 时间变长,且经常因为个别 Subtask 未完成而失败。

二、 核心解决方案

1. 过滤异常数据(最简单的方案)

场景:很多时候倾斜是因为 Key 为 null、空字符串或某些无意义的默认值(如 "undefined")过多。
方案

  • keyBy 之前进行 filter,过滤掉这些无意义的 Key。
  • 如果这些数据不能丢弃,可以将这些 Key 替换为随机值(如 null_random_suffix),让它们分散到不同的 Subtask 处理(仅适用于不需要对这些 Key 做精确聚合的场景)。

2. Rebalance / Shuffle(解决数据源分布不均)

场景:数据源(如 Kafka Partition)本身数据不均,或者上游算子处理逻辑导致输出不均,但还没有进行 keyBy
方案

  • DataStream API: 调用 .rebalance()。这会使用 Round-Robin(轮询)方式将数据均匀分发给下游。
  • 注意:这只能解决分区倾斜,无法解决由 keyBy 导致的Key 倾斜

3. 两阶段聚合(Two-Phase Aggregation)

场景:普通的聚合操作(Count, Sum, Max 等),某个 Key 特别多。
原理:类似 MapReduce 的 Combiner 思想。

  1. 预聚合(Local Aggregation):在 Shuffle 前先在本地做一次聚合。
  2. 最终聚合(Global Aggregation):将预聚合的结果 Shuffle 到下游做最终聚合。

实现方式

  • Flink SQL / Table API(推荐):
    开启 MiniBatch 和 LocalGlobal 优化参数。Flink 会自动探测并进行两阶段聚合。
    sql
    -- 开启 MiniBatch(攒一批数据再处理,减少状态访问)
    SET 'table.exec.mini-batch.enabled' = 'true';
    SET 'table.exec.mini-batch.allow-latency' = '1s'; -- 允许最大延迟
    SET 'table.exec.mini-batch.size' = '1000';
    -- 开启 LocalGlobal 优化
    SET 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE';
  • DataStream API
    没有直接的参数,需要自己写代码。通常是在 Key 上加随机前缀 -> 聚合 -> 去掉前缀 -> 再次聚合(参考下文的“加盐”)。

4. Key 加盐(Salting)/ 散列化

场景:两阶段聚合无法满足(如求 Distinct Count),或者需要手动控制倾斜。
原理:将原本集中的 Key 打散。

  1. Phase 1:给倾斜的 Key 加上随机后缀(例如 0-9),将其拆分成 10 个不同的 Key,分散到不同 Subtask。
  2. Phase 2:对打散后的结果进行聚合。
  3. Phase 3:去掉后缀,还原原本的 Key,再次 keyBy 进行最终聚合。

代码逻辑示例

java
// 1. 加盐:Key + "_" + Random(0-9)
stream.map(data -> {
    String newKey = data.getKey() + "_" + new Random().nextInt(10);
    return new KeyedData(newKey, data.getValue());
})
.keyBy(KeyedData::getNewKey)
.window(...)
.reduce(...) // 第一次聚合

// 2. 去盐:还原 Key
.map(result -> {
    String originalKey = result.getNewKey().split("_")[0];
    return new KeyedData(originalKey, result.getValue());
})
.keyBy(KeyedData::getKey)
.window(...)
.reduce(...) // 第二次最终聚合

5. 广播 Join(Broadcast Join)

场景大表 Join 小表,但大表中有数据倾斜(某个 Key 特别多)。
原理

  • 不使用常规的 keyBy (Hash Shuffle) 进行 Join。
  • 小表广播(Broadcast)到大表所在的所有 Subtask。
  • 大表的数据不需要移动(避免了 Shuffle 和倾斜),直接在本地与广播过来的小表数据进行 Join。

实现方式

  • Flink SQL:通常会自动优化。如果没优化,可以使用 Hint 强制:
    sql
    SELECT /*+ BROADCAST(small_table) */ *
    FROM large_table JOIN small_table ON ...
  • DataStream API:使用 BroadcastState

6. 采样 + 拆分 Join(处理大表 Join 大表)

场景大表 Join 大表,且其中一张表(表 A)有热点 Key。
方案

  1. 识别热点:通过采样或监控,找出表 A 中的热点 Key。
  2. 数据拆分
    • 将表 A 拆分为“热点数据”和“普通数据”两个流。
    • 将表 B 也拆分为“与热点 Key 关联的数据”和“普通数据”。
  3. 分别处理
    • 普通部分:正常 Join。
    • 热点部分:对表 A 的热点 Key 加盐(打散),对表 B 的对应 Key 膨胀/复制(Replication,例如表 A 加盐 1-10,表 B 的这条数据就复制 10 份,后缀分别为 1-10)。
  4. 合并结果:将两部分 Join 的结果 Union 起来。

7. 自定义分区器(Custom Partitioner)

场景:业务逻辑特殊,知道哪些 Key 会倾斜,且可以通过特定规则分散。
方案
使用 .partitionCustom(Partitioner, KeySelector)
在自定义的 Partitioner 中,将热点 Key 单独路由到一组特定的 Subtask,或者将其打散,而将非热点 Key 路由到其他 Subtask。
注:这种方法比较硬核,维护成本高,通常不如加盐法通用。


三、 总结建议

场景 优先策略 备注
Key 为 Null/异常值 Filter / 替换 最低成本,最先检查。
普通聚合 (Count/Sum) Table API: 开启 MiniBatch/LocalGlobal 框架自动优化,无需改代码逻辑。
DataStream 聚合 两阶段聚合 (加盐) 手动实现 MapReduce 思想。
大表 Join 小表 Broadcast Join 避免 Shuffle。
大表 Join 大表 热点 Key 加盐 + 另一侧膨胀 成本较高,针对性解决。
Distinct Count (去重) Split Distinct (SQL) / BitMap / HyperLogLog 去重是重灾区,SQL 中可开启 table.optimizer.distinct-agg.split.enabled

面试回答话术总结:
"处理 Flink 数据倾斜,首先要定位是 Key 分布不均导致的。针对不同场景有不同解法:如果是脏数据(如大量 Null)直接过滤;如果是普通聚合,SQL 层面开启 LocalGlobal 做两阶段聚合,DataStream 层面手动加盐再去盐;如果是 Join 场景,大表 Join 小表用 Broadcast Join,大表 Join 大表则需要对热点 Key 进行加盐处理并对另一侧表进行扩容(膨胀)适配。"

00:00
00:00