什么是两阶段聚合(局部聚合 + 全局聚合)?它如何解决 Spark聚合类的数据倾斜?
两阶段聚合(Two-Stage Aggregation),通常被称为“加盐局部聚合 + 去盐全局聚合”,是大数据处理(如 Spark、MapReduce)中解决聚合类数据倾斜(Aggregation Data Skew)最有效、最通用的方案之一。
简单来说,它的核心思想是:将原本要汇聚到一个节点处理的“超级大Key”,先拆分成多份分散到不同节点做预处理,最后再合并结果。
1. 为什么会有数据倾斜?(背景)
在 Spark 中执行 reduceByKey、groupByKey 或 SQL 的 GROUP BY 时,Spark 会根据 Key 的 Hash 值进行 Shuffle(洗牌),将相同的 Key 拉取到同一个 Task(分区)中进行处理。
- 正常情况:Key 分布均匀,每个 Task 处理的数据量差不多。
- 倾斜情况:某个 Key(例如 "北京" 或 "Null")的数据量特别大(比如有 1000 万条),而其他 Key 只有几百条。
- 结果:处理 "北京" 的那个 Task 会非常慢,甚至因为内存溢出(OOM)而崩溃,导致整个作业卡死。
2. 什么是两阶段聚合?
为了解决上述问题,我们将一次大的聚合操作拆分为两个阶段:
- 第一阶段(局部聚合 / Local Aggregation): 给 Key 加上随机前缀(加盐),将原本集中的数据打散,进行初步聚合。
- 第二阶段(全局聚合 / Global Aggregation): 去掉随机前缀(去盐),将第一阶段的结果再次聚合,得到最终结果。
3. 它如何解决数据倾斜?(详细原理解析)
假设我们有一个海量的点击日志,其中 Key 是 City,我们要统计每个城市的点击量。其中 City = "Beijing" 发生了严重倾斜。
❌ 原始做法(会导致倾斜)
所有 Beijing 的记录都会被 Hash 到同一个 Reducer Task。
- Task A: 处理 1000 万条 "Beijing" -> 崩溃/极慢
- Task B: 处理 1000 条 "Shanghai" -> 瞬间完成
✅ 两阶段聚合做法
Step 1: 加盐(Salting)
在 Map 阶段,我们给每个 Key 加上一个随机数前缀(比如 0-9)。
Beijing变成了0_Beijing,1_Beijing, ...,9_Beijing。- 现在,原本巨大的
Beijing组被拆成了 10 个不同的小组。
Step 2: 第一阶段聚合(局部聚合)
对加上前缀的 Key 进行 reduceByKey 或 GROUP BY。
由于 Key 变了(Hash 值也变了),这些数据会被分散到不同的 Task 中去。
- Task A 处理
0_Beijing(100万条) -> 结果:0_Beijing: 100万 - Task B 处理
1_Beijing(100万条) -> 结果:1_Beijing: 100万 - ...
- 效果:原本压死一个节点的压力,现在被分摊到了 10 个节点上,每个节点都能轻松处理。
Step 3: 去盐(Unsalting)
将第一阶段输出结果的 Key 中的随机前缀去掉。
0_Beijing->Beijing1_Beijing->Beijing- 此时,数据量已经大幅减少(从 1000 万条原始记录变成了 10 条中间结果)。
Step 4: 第二阶段聚合(全局聚合)
对还原后的 Key 再次进行 reduceByKey 或 GROUP BY。
- 此时所有
Beijing的中间结果(共 10 条)会被汇聚到一个 Task。 - 计算:100万 + 100万 + ... = 1000万。
- 效果:虽然最后还是汇聚到一个节点,但处理的数据量极小,瞬间即可完成。
4. 代码示例 (Spark RDD)
假设 rdd 是 (String, Int) 类型,Key 倾斜严重。
import scala.util.Random
// 定义随机盐的范围,比如分成 10 份
val saltCount = 10
// --- 第一阶段:加盐 + 局部聚合 ---
val localAggRDD = rdd.map { case (key, value) =>
// 1. 加盐:在 Key 前面加上 0-9 的随机前缀
val randomPrefix = Random.nextInt(saltCount)
val saltedKey = s"${randomPrefix}_${key}"
(saltedKey, value)
}.reduceByKey(_ + _) // 2. 局部聚合:分散压力
// --- 第二阶段:去盐 + 全局聚合 ---
val finalAggRDD = localAggRDD.map { case (saltedKey, value) =>
// 3. 去盐:去掉前缀,还原 Key
val originalKey = saltedKey.split("_")(1)
(originalKey, value)
}.reduceByKey(_ + _) // 4. 全局聚合:汇总最终结果
// 打印结果
finalAggRDD.collect().foreach(println)
5. 适用场景与局限性
适用场景:
- 聚合类算子:
groupByKey,reduceByKey,aggregateByKey等。 - SQL 场景:
GROUP BY语句。 - 大 Key 问题:某一个或几个 Key 数据量特别大,远超其他 Key。
局限性:
- 不适用于 Join:这种方法专门用于聚合。如果是 Join 操作导致的倾斜,需要使用“加盐广播”或“双重加盐”等其他策略。
- 不适用于所有 Key 都很大的情况:如果所有 Key 数据量都很大且分布均匀,加盐只会增加 Shuffle 的开销(多了一次 Shuffle),不能提升性能。
- 非累加性操作:对于
Count、Sum、Max、Min很有效。但对于Count Distinct(去重统计)或者求Median(中位数),这种方法实现起来会非常复杂,因为局部聚合无法直接得出全局的去重结果。
总结
两阶段聚合通过“分而治之”的策略解决数据倾斜:
- 加盐把大 Key 伪装成多个小 Key。
- 局部聚合利用集群并行能力处理掉大部分数据量。
- 去盐全局聚合轻松合并剩下的少量结果。