基于本文回答
0
评论

什么是两阶段聚合(局部聚合 + 全局聚合)?它如何解决 Spark聚合类的数据倾斜?

知识点图片

两阶段聚合(Two-Stage Aggregation),通常被称为“加盐局部聚合 + 去盐全局聚合”,是大数据处理(如 Spark、MapReduce)中解决聚合类数据倾斜(Aggregation Data Skew)最有效、最通用的方案之一。

简单来说,它的核心思想是:将原本要汇聚到一个节点处理的“超级大Key”,先拆分成多份分散到不同节点做预处理,最后再合并结果。


1. 为什么会有数据倾斜?(背景)

在 Spark 中执行 reduceByKeygroupByKey 或 SQL 的 GROUP BY 时,Spark 会根据 Key 的 Hash 值进行 Shuffle(洗牌),将相同的 Key 拉取到同一个 Task(分区)中进行处理。

  • 正常情况:Key 分布均匀,每个 Task 处理的数据量差不多。
  • 倾斜情况:某个 Key(例如 "北京" 或 "Null")的数据量特别大(比如有 1000 万条),而其他 Key 只有几百条。
    • 结果:处理 "北京" 的那个 Task 会非常慢,甚至因为内存溢出(OOM)而崩溃,导致整个作业卡死。

2. 什么是两阶段聚合?

为了解决上述问题,我们将一次大的聚合操作拆分为两个阶段:

  1. 第一阶段(局部聚合 / Local Aggregation): 给 Key 加上随机前缀(加盐),将原本集中的数据打散,进行初步聚合。
  2. 第二阶段(全局聚合 / Global Aggregation): 去掉随机前缀(去盐),将第一阶段的结果再次聚合,得到最终结果。

3. 它如何解决数据倾斜?(详细原理解析)

假设我们有一个海量的点击日志,其中 Key 是 City,我们要统计每个城市的点击量。其中 City = "Beijing" 发生了严重倾斜。

❌ 原始做法(会导致倾斜)

所有 Beijing 的记录都会被 Hash 到同一个 Reducer Task。

  • Task A: 处理 1000 万条 "Beijing" -> 崩溃/极慢
  • Task B: 处理 1000 条 "Shanghai" -> 瞬间完成

✅ 两阶段聚合做法

Step 1: 加盐(Salting)
在 Map 阶段,我们给每个 Key 加上一个随机数前缀(比如 0-9)。

  • Beijing 变成了 0_Beijing, 1_Beijing, ..., 9_Beijing
  • 现在,原本巨大的 Beijing 组被拆成了 10 个不同的小组。

Step 2: 第一阶段聚合(局部聚合)
对加上前缀的 Key 进行 reduceByKeyGROUP BY
由于 Key 变了(Hash 值也变了),这些数据会被分散到不同的 Task 中去。

  • Task A 处理 0_Beijing (100万条) -> 结果:0_Beijing: 100万
  • Task B 处理 1_Beijing (100万条) -> 结果:1_Beijing: 100万
  • ...
  • 效果:原本压死一个节点的压力,现在被分摊到了 10 个节点上,每个节点都能轻松处理。

Step 3: 去盐(Unsalting)
将第一阶段输出结果的 Key 中的随机前缀去掉。

  • 0_Beijing -> Beijing
  • 1_Beijing -> Beijing
  • 此时,数据量已经大幅减少(从 1000 万条原始记录变成了 10 条中间结果)。

Step 4: 第二阶段聚合(全局聚合)
对还原后的 Key 再次进行 reduceByKeyGROUP BY

  • 此时所有 Beijing 的中间结果(共 10 条)会被汇聚到一个 Task。
  • 计算:100万 + 100万 + ... = 1000万。
  • 效果:虽然最后还是汇聚到一个节点,但处理的数据量极小,瞬间即可完成。

4. 代码示例 (Spark RDD)

假设 rdd(String, Int) 类型,Key 倾斜严重。

plaintext
import scala.util.Random

// 定义随机盐的范围,比如分成 10 份
val saltCount = 10

// --- 第一阶段:加盐 + 局部聚合 ---
val localAggRDD = rdd.map { case (key, value) =>
  // 1. 加盐:在 Key 前面加上 0-9 的随机前缀
  val randomPrefix = Random.nextInt(saltCount)
  val saltedKey = s"${randomPrefix}_${key}"
  (saltedKey, value)
}.reduceByKey(_ + _) // 2. 局部聚合:分散压力

// --- 第二阶段:去盐 + 全局聚合 ---
val finalAggRDD = localAggRDD.map { case (saltedKey, value) =>
  // 3. 去盐:去掉前缀,还原 Key
  val originalKey = saltedKey.split("_")(1)
  (originalKey, value)
}.reduceByKey(_ + _) // 4. 全局聚合:汇总最终结果

// 打印结果
finalAggRDD.collect().foreach(println)

5. 适用场景与局限性

适用场景:

  • 聚合类算子groupByKey, reduceByKey, aggregateByKey 等。
  • SQL 场景GROUP BY 语句。
  • 大 Key 问题:某一个或几个 Key 数据量特别大,远超其他 Key。

局限性:

  • 不适用于 Join:这种方法专门用于聚合。如果是 Join 操作导致的倾斜,需要使用“加盐广播”或“双重加盐”等其他策略。
  • 不适用于所有 Key 都很大的情况:如果所有 Key 数据量都很大且分布均匀,加盐只会增加 Shuffle 的开销(多了一次 Shuffle),不能提升性能。
  • 非累加性操作:对于 CountSumMaxMin 很有效。但对于 Count Distinct(去重统计)或者求 Median(中位数),这种方法实现起来会非常复杂,因为局部聚合无法直接得出全局的去重结果。

总结

两阶段聚合通过“分而治之”的策略解决数据倾斜:

  1. 加盐把大 Key 伪装成多个小 Key。
  2. 局部聚合利用集群并行能力处理掉大部分数据量。
  3. 去盐全局聚合轻松合并剩下的少量结果。
右滑查看面试常问