Spark 数据倾斜的常见解决方案有哪些?
Spark 数据倾斜(Data Skew)是大数据处理中最常见的问题之一。它的典型表现是:绝大多数 Task 很快执行完,但个别 Task 执行极慢(卡在 99%),或者直接报 OOM(内存溢出)。
解决数据倾斜通常没有“银弹”,需要根据具体的业务场景(是 Join 还是 Aggregation,是 Key 分布不均还是 Null 值过多)选择合适的方案。
以下是 Spark 数据倾斜的 7 种常见解决方案,按从易到难、从通用到特定的顺序排列:
1. 利用 Spark 3.x AQE (Adaptive Query Execution) —— 最推荐(躺平方案)
如果你使用的是 Spark 3.0 及以上版本,这是首选方案。
- 原理:Spark 在运行时收集统计信息,自动检测倾斜的 Partition,并将其拆分成多个小的 Partition 进行处理。
- 适用场景:几乎所有 SQL/DataFrame 场景。
- 配置:plaintext
spark.sql.adaptive.enabled=true spark.sql.adaptive.skewJoin.enabled=true spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 # 默认5,即分区大小是中位数的5倍视为倾斜 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB # 默认256MB
2. 调整并行度 (Increase Parallelism)
- 原理:增加 Shuffle Read Task 的数量,让每个 Task 处理的数据量变少。如果倾斜是因为某个 Key 的数据量刚好比其他 Key 多一点点,拆分后可能就不倾斜了。
- 适用场景:倾斜程度不严重,或者数据量整体很大但 Key 分布相对均匀的情况。
- 操作:
- SQL/DataFrame: 设置
spark.sql.shuffle.partitions(默认 200,可调大到 1000-2000)。 - RDD: 在算子中指定参数,如
groupByKey(1000)。
- SQL/DataFrame: 设置
- 局限性:如果某个 Key 极其大(例如由几百万条数据),即使增加了并行度,这个 Key 还是会被分到一个 Task 中,无法根本解决问题。
3. 广播 Join (Broadcast Join / Map-side Join)
- 原理:将小表广播到所有 Executor 的内存中,在 Map 端进行 Join,避免了 Shuffle。没有 Shuffle 就没有倾斜。
- 适用场景:大表 Join 小表。
- 操作:
- 增加自动广播阈值:
spark.sql.autoBroadcastJoinThreshold(默认 10MB,内存够可调大到 100MB 或更高)。 - 强制广播 Hint:sql
SELECT /*+ BROADCAST(b) */ * FROM big_table a JOIN small_table b ON a.id = b.id
- 增加自动广播阈值:
- 局限性:只适用于有一张表较小能放进内存的场景。
4. 过滤或特殊处理异常数据 (Null 值/空值)
- 原理:很多时候倾斜是因为 Null 值或空字符串过多,且这些数据在业务上可能不需要 Join 或聚合。
- 适用场景:倾斜是由 Null、空值或无意义的 Key 导致的。
- 操作:
- 方案 A(直接过滤):如果业务允许,直接
filter(col("key").isNotNull)。 - 方案 B(随机值转换):如果需要保留 Null 数据(比如 Left Join),可以将 Null 转换成随机值(如
null_prefix_1,null_prefix_2),让它们分散到不同 Task,Join 不上自然就保留下来了。
- 方案 A(直接过滤):如果业务允许,直接
5. 两阶段聚合 (Two-stage Aggregation)
- 原理:将原本的一个巨大的 Group By 拆分成两个步骤。
- 局部聚合:给 Key 加上随机前缀(如
0_Key,1_Key),对打散后的 Key 进行聚合。 - 全局聚合:去掉前缀,对原本的 Key 进行最终聚合。
- 局部聚合:给 Key 加上随机前缀(如
- 适用场景:聚合操作(GroupBy)导致的数据倾斜。
- 代码逻辑:plaintext
// 伪代码 // 第一阶段:加盐 + 聚合 val rdd1 = rdd.map { case (key, value) => val salt = new Random().nextInt(10) (salt + "_" + key, value) }.reduceByKey(_ + _) // 第二阶段:去盐 + 聚合 val rdd2 = rdd1.map { case (key, value) => (key.split("_")(1), value) }.reduceByKey(_ + _)
6. 加盐 (Salting) —— 解决大表 Join 大表
这是解决大表 Join 大表且其中一个表有热点 Key 的终极方案。
- 原理:
- 倾斜侧(大表A):将倾斜的 Key 加上 1~N 的随机后缀(打散)。
- 非倾斜侧(大表B):将对应的 Key 膨胀 N 倍(复制 N 份,后缀分别为 1~N)。
- 这样原本的一个超级大 Join 就被拆分成了 N 个小 Join。
- 适用场景:两张大表 Join,且无法使用广播 Join。
- 操作:
- 通常只针对导致倾斜的特定 Key 进行加盐,否则数据量膨胀太大。
- 代码复杂度较高,需要手动扩容数据。
7. 采样扩容 (Sampling & Splitting)
- 原理:将发生倾斜的 Key 单独拎出来处理,普通的 Key 走普通流程,最后 Union 结果。
- 适用场景:只有极少数几个 Key 导致倾斜。
- 操作步骤:
- 采样:
sample算子找出 Top N 的热点 Key。 - 拆分:
- 数据集 A = 过滤出热点 Key 的数据。
- 数据集 B = 过滤掉热点 Key 的数据。
- 处理:
- 对数据集 A 使用“加盐”或“广播”(如果拆出来后变小了)进行特殊处理。
- 对数据集 B 进行普通 Join。
- 合并:
union两个结果集。
- 采样:
总结:决策路径
- 先看版本:是 Spark 3.x 吗?是的话先开 AQE。
- 看操作类型:
- 如果是 Join:
- 有一张小表? -> 广播 Join。
- 有 Null 值? -> 过滤或转随机值。
- 两张大表? -> 加盐 (Salting) 或 单独拆分热点 Key。
- 如果是 GroupBy (聚合):
- -> 两阶段聚合 (加随机前缀 -> 聚合 -> 去前缀 -> 聚合)。
- 如果是 Join:
- 通用尝试:适当调大
spark.sql.shuffle.partitions。