基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

Spark 有哪几种 Shuffle 实现机制?

知识点图片

Apache Spark 的 Shuffle 实现机制经历了多次演进。从宏观的历史演变来看,主要分为 Hash Shuffle (已废弃) 和 Sort Shuffle (当前主流) 两大类。

但在目前的 Spark 版本(2.0 及以后)中,实际上只有 Sort Shuffle 这一种架构,但它内部根据不同的条件,会智能选择三种不同的写入策略(Writer Modes)

以下是详细的分类和原理解析:


一、 历史演变分类

1. Hash Shuffle (已废弃)

这是 Spark 早期(1.2版本之前)的默认机制。

  • 普通机制 (V1): 每个 Map Task 会为每个 Reduce Task 生成一个单独的文件。
    • 问题: 产生的文件数量是 M×RM \times R (Map任务数 ×\times Reduce任务数)。如果 Mapper 和 Reducer 各有 1000 个,就会产生 100 万个文件,导致文件描述符(FD)耗尽和大量的随机磁盘 I/O。
  • 合并机制 (Consolidate Files, V2): 优化了上述问题,让同一个 Executor 中的不同 Map Task 共用一组输出文件。
    • 现状: 由于内存消耗大且难以处理大规模数据,在 Spark 2.0 中已被彻底移除

2. Sort Shuffle (当前标准)

从 Spark 1.2 开始引入,Spark 2.0 后成为唯一标准。

  • 核心逻辑: 每个 Map Task 最终只产生一个数据文件一个索引文件
  • 过程: 数据先写入内存缓冲,内存满后进行排序(按 Partition ID 排序,部分模式下按 Key 排序),然后溢写(Spill)到磁盘,最后将多个溢写文件合并(Merge)成一个大文件。
  • 优势: 极大减少了小文件数量,将随机 I/O 转换为顺序 I/O。

二、 Sort Shuffle 内部的三种实现模式 (重点)

在现代 Spark (2.x, 3.x) 中,Sort Shuffle Manager 会根据任务的具体情况(如是否有聚合、分区数量、序列化方式等),自动选择以下三种 Handle(处理句柄/写入器)之一:

1. Bypass Merge Sort Shuffle (绕过合并排序模式)

  • 触发条件:
    1. 没有 Map 端聚合操作(如 groupByKey, sortByKey 等)。
    2. Shuffle 的分区数(Reducer 数量)少于阈值(默认 200,由 spark.shuffle.sort.bypassMergeThreshold 控制)。
  • 原理:
    • 它虽然属于 Sort Shuffle Manager,但实际上不进行排序
    • 它像早期的 Hash Shuffle 一样,为每个 Partition 开辟一个临时文件进行写入。
    • 但在最后,它会将这些临时文件合并成一个最终的数据文件,并创建索引文件。
  • 优点: 省去了排序的 CPU 开销(因为不需要排序),在分区数少时性能极高。

2. Serialized / Unsafe Shuffle (序列化/Tungsten 模式)

  • 触发条件:
    1. 序列化器支持 Relocation(如 KryoSerializer,或者 Spark SQL 自带的序列化器)。
    2. 没有 Map 端聚合。
    3. 分区数少于 1677721616777216 (2242^{24})。
  • 原理:
    • 这是 Spark Tungsten (钨丝计划) 优化的一部分。
    • 它直接在二进制数据(Binary Data)上操作,使用堆外内存(Off-heap)或页式内存管理。
    • 排序时只对指针和 Partition ID 进行排序,不反序列化数据。
  • 优点: 极大地减少了 Java 对象创建(GC 开销小),内存利用率极高,CPU 缓存命中率高。这是处理 Spark SQL 和 DataFrame 时最常用的模式。

3. Base Shuffle (普通排序模式)

  • 触发条件:
    • 上述两种模式都不满足时(例如:需要 Map 端聚合,或者使用了不支持 Relocation 的自定义对象)。
  • 原理:
    • 这是最传统的 Sort Shuffle 实现。
    • 使用 ExternalSorter
    • 数据以 Java 对象形式存在于内存中(Map<Key, Value> 结构)。
    • 在溢写前需要对 Key 进行排序(如果需要聚合)。
  • 特点: 功能最全(支持聚合),但内存和 GC 开销相对较大。

三、 总结对比表

机制/模式 状态 核心特点 适用场景
Hash Shuffle 已移除 不排序,文件数爆炸 小规模数据 (旧版本)
Sort Shuffle 默认 产生 1 Data + 1 Index 文件 所有场景
   ├─ Bypass 内部模式 不排序,直接写临时文件后合并 分区少 (<200)、无 Map 端聚合
   ├─ Serialized 内部模式 二进制排序,Tungsten 优化 Spark SQL, DataFrame, Kryo 序列化
   └─ Base 内部模式 对象排序,支持聚合 RDD 复杂操作,有 Map 端聚合

四、 Spark 3.0+ 的新特性补充

虽然底层写入机制依然是上述三种,但 Spark 3.0+ 引入了新的特性影响 Shuffle 的行为:

  1. AQE (Adaptive Query Execution): 可以在运行时动态合并 Shuffle 分区(Coalesce Partitions)。虽然写入时还是产生那么多分区,但在读取阶段会将小的分区合并,解决数据倾斜和小文件读取问题。
  2. Push-based Shuffle (RSS 相关): 针对 YARN 部署环境,Spark 3.2+ 引入了 Push-based Shuffle,允许 Mapper 主动将数据推送到远程的 Shuffle Service 进行预合并,进一步优化大规模 Shuffle 的随机读问题。
00:00
00:00