Flink遇到数据倾斜(Data Skew)该如何处理?(Key 散列、两阶段聚合等)
Flink 中的数据倾斜(Data Skew)通常表现为部分 Subtask 处理的数据量远超其他 Subtask,导致整个作业的进度被这几个“热点”拖慢,出现反压(Backpressure)、Checkpoint 超时甚至 OOM(内存溢出)。
处理数据倾斜通常需要结合业务场景(是聚合还是 Join?)和API 类型(DataStream 还是 SQL/Table)来选择方案。以下是系统性的处理策略:
一、 诊断:如何确认发生了数据倾斜?
在动手解决前,先通过 Flink Web UI 确认:
- Subtask 层面:查看某个算子(Operator)的各个 Subtask 的
Records Received或Records Sent。如果有的几千万,有的只有几百,那就是倾斜。 - 反压情况:查看 Backpressure 面板,倾斜节点的下游通常反压为 High,而其他并行度正常。
- Checkpoint:Checkpoint 时间变长,且经常因为个别 Subtask 未完成而失败。
二、 核心解决方案
1. 过滤异常数据(最简单的方案)
场景:很多时候倾斜是因为 Key 为 null、空字符串或某些无意义的默认值(如 "undefined")过多。
方案:
- 在
keyBy之前进行filter,过滤掉这些无意义的 Key。 - 如果这些数据不能丢弃,可以将这些 Key 替换为随机值(如
null_random_suffix),让它们分散到不同的 Subtask 处理(仅适用于不需要对这些 Key 做精确聚合的场景)。
2. Rebalance / Shuffle(解决数据源分布不均)
场景:数据源(如 Kafka Partition)本身数据不均,或者上游算子处理逻辑导致输出不均,但还没有进行 keyBy。
方案:
- DataStream API: 调用
.rebalance()。这会使用 Round-Robin(轮询)方式将数据均匀分发给下游。 - 注意:这只能解决分区倾斜,无法解决由
keyBy导致的Key 倾斜。
3. 两阶段聚合(Two-Phase Aggregation)
场景:普通的聚合操作(Count, Sum, Max 等),某个 Key 特别多。
原理:类似 MapReduce 的 Combiner 思想。
- 预聚合(Local Aggregation):在 Shuffle 前先在本地做一次聚合。
- 最终聚合(Global Aggregation):将预聚合的结果 Shuffle 到下游做最终聚合。
实现方式:
- Flink SQL / Table API(推荐):
开启 MiniBatch 和 LocalGlobal 优化参数。Flink 会自动探测并进行两阶段聚合。sql-- 开启 MiniBatch(攒一批数据再处理,减少状态访问) SET 'table.exec.mini-batch.enabled' = 'true'; SET 'table.exec.mini-batch.allow-latency' = '1s'; -- 允许最大延迟 SET 'table.exec.mini-batch.size' = '1000'; -- 开启 LocalGlobal 优化 SET 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE'; - DataStream API:
没有直接的参数,需要自己写代码。通常是在 Key 上加随机前缀 -> 聚合 -> 去掉前缀 -> 再次聚合(参考下文的“加盐”)。
4. Key 加盐(Salting)/ 散列化
场景:两阶段聚合无法满足(如求 Distinct Count),或者需要手动控制倾斜。
原理:将原本集中的 Key 打散。
- Phase 1:给倾斜的 Key 加上随机后缀(例如 0-9),将其拆分成 10 个不同的 Key,分散到不同 Subtask。
- Phase 2:对打散后的结果进行聚合。
- Phase 3:去掉后缀,还原原本的 Key,再次
keyBy进行最终聚合。
代码逻辑示例:
// 1. 加盐:Key + "_" + Random(0-9)
stream.map(data -> {
String newKey = data.getKey() + "_" + new Random().nextInt(10);
return new KeyedData(newKey, data.getValue());
})
.keyBy(KeyedData::getNewKey)
.window(...)
.reduce(...) // 第一次聚合
// 2. 去盐:还原 Key
.map(result -> {
String originalKey = result.getNewKey().split("_")[0];
return new KeyedData(originalKey, result.getValue());
})
.keyBy(KeyedData::getKey)
.window(...)
.reduce(...) // 第二次最终聚合
5. 广播 Join(Broadcast Join)
场景:大表 Join 小表,但大表中有数据倾斜(某个 Key 特别多)。
原理:
- 不使用常规的
keyBy(Hash Shuffle) 进行 Join。 - 将小表广播(Broadcast)到大表所在的所有 Subtask。
- 大表的数据不需要移动(避免了 Shuffle 和倾斜),直接在本地与广播过来的小表数据进行 Join。
实现方式:
- Flink SQL:通常会自动优化。如果没优化,可以使用 Hint 强制:sql
SELECT /*+ BROADCAST(small_table) */ * FROM large_table JOIN small_table ON ... - DataStream API:使用
BroadcastState。
6. 采样 + 拆分 Join(处理大表 Join 大表)
场景:大表 Join 大表,且其中一张表(表 A)有热点 Key。
方案:
- 识别热点:通过采样或监控,找出表 A 中的热点 Key。
- 数据拆分:
- 将表 A 拆分为“热点数据”和“普通数据”两个流。
- 将表 B 也拆分为“与热点 Key 关联的数据”和“普通数据”。
- 分别处理:
- 普通部分:正常 Join。
- 热点部分:对表 A 的热点 Key 加盐(打散),对表 B 的对应 Key 膨胀/复制(Replication,例如表 A 加盐 1-10,表 B 的这条数据就复制 10 份,后缀分别为 1-10)。
- 合并结果:将两部分 Join 的结果 Union 起来。
7. 自定义分区器(Custom Partitioner)
场景:业务逻辑特殊,知道哪些 Key 会倾斜,且可以通过特定规则分散。
方案:
使用 .partitionCustom(Partitioner, KeySelector)。
在自定义的 Partitioner 中,将热点 Key 单独路由到一组特定的 Subtask,或者将其打散,而将非热点 Key 路由到其他 Subtask。
注:这种方法比较硬核,维护成本高,通常不如加盐法通用。
三、 总结建议
| 场景 | 优先策略 | 备注 |
|---|---|---|
| Key 为 Null/异常值 | Filter / 替换 | 最低成本,最先检查。 |
| 普通聚合 (Count/Sum) | Table API: 开启 MiniBatch/LocalGlobal | 框架自动优化,无需改代码逻辑。 |
| DataStream 聚合 | 两阶段聚合 (加盐) | 手动实现 MapReduce 思想。 |
| 大表 Join 小表 | Broadcast Join | 避免 Shuffle。 |
| 大表 Join 大表 | 热点 Key 加盐 + 另一侧膨胀 | 成本较高,针对性解决。 |
| Distinct Count (去重) | Split Distinct (SQL) / BitMap / HyperLogLog | 去重是重灾区,SQL 中可开启 table.optimizer.distinct-agg.split.enabled。 |
面试回答话术总结:
"处理 Flink 数据倾斜,首先要定位是 Key 分布不均导致的。针对不同场景有不同解法:如果是脏数据(如大量 Null)直接过滤;如果是普通聚合,SQL 层面开启 LocalGlobal 做两阶段聚合,DataStream 层面手动加盐再去盐;如果是 Join 场景,大表 Join 小表用 Broadcast Join,大表 Join 大表则需要对热点 Key 进行加盐处理并对另一侧表进行扩容(膨胀)适配。"