Spark的AQE(Adaptive Query Execution,自适应查询执行)
Spark AQE (Adaptive Query Execution,自适应查询执行) 是 Spark 3.0 引入的一项革命性功能(在 Spark 3.2 中默认开启),旨在解决传统静态优化器(Static Optimizer)无法解决的性能瓶颈。
简单来说,AQE 允许 Spark 在运行时(Runtime)根据实际的统计数据动态调整执行计划,而不是仅仅依赖于执行前的静态统计估算。
以下是关于 AQE 的深度解析,包括其核心功能、工作原理和配置方法。
1. 为什么需要 AQE?
在 AQE 出现之前,Spark 使用 Catalyst 优化器生成物理执行计划。这是一个基于成本的优化器(CBO),它依赖于表及其列的统计信息(如行数、最大值、最小值等)。
传统 CBO 的痛点:
- 统计信息过时或缺失: 如果数据变化快,统计信息可能不准确。
- 中间结果难以估算: 经过复杂的过滤(Filter)和聚合(Aggregation)后,Spark 很难准确预测中间数据集的大小。
- 黑盒执行: 一旦计划生成并开始执行,即使发现某个阶段的数据量远超预期,Spark 也只能硬着头皮按原计划跑完,导致性能低下或 OOM。
AQE 的解决方案:
AQE 在每个 Stage(阶段)执行完毕后,利用真实的运行时统计数据,重新审视并优化剩余的逻辑计划。
2. AQE 的三大核心功能
AQE 主要通过以下三种机制来优化查询:
(1) 动态合并 Shuffle 分区 (Dynamically Coalescing Shuffle Partitions)
- 问题: 设置
spark.sql.shuffle.partitions是个难题。- 设得太小:单个分区数据量过大,导致内存溢出(OOM)或 GC 频繁。
- 设得太大:产生大量微小分区,导致调度开销大、I/O 效率低(大量小文件)。
- AQE 优化:
- 你可以将初始 Shuffle 分区数设得比较大(例如 2000)。
- 在 Shuffle Map 阶段结束后,AQE 会查看每个分区的实际数据量。
- 如果发现连续的多个分区数据量都很小,AQE 会自动将它们合并(Coalesce)成一个适度大小的分区。
- 结果: 既避免了 OOM,又避免了小文件和调度开销。
(2) 动态切换 Join 策略 (Dynamically Switching Join Strategies)
- 问题: 假设你有两张表 Join,其中一张表原本很大,但在经过 Filter 过滤后变得很小(例如只有 10MB)。
- 静态优化器可能因为原始表很大,而选择了 SortMergeJoin(需要 Shuffle 和排序,慢)。
- AQE 优化:
- AQE 在运行时发现过滤后的中间结果实际上小于广播阈值(默认 10MB)。
- 它会动态将原本计划的 SortMergeJoin 更改为 BroadcastHashJoin。
- 结果: 避免了 Shuffle,大幅提升 Join 性能。
(3) 动态优化数据倾斜 Join (Dynamically Optimizing Skew Joins)
- 问题: 数据倾斜(Data Skew)是分布式计算的噩梦。某个 Key 的数据量远超其他 Key,导致某个 Task 运行时间极长(长尾效应),拖慢整个作业。
- AQE 优化:
- AQE 会自动检测 Shuffle 数据中的倾斜分区。
- 它将那个巨大的倾斜分区拆分成多个小的子任务(Splits)。
- 同时,将 Join 另一侧对应的数据进行复制(Replicate),以适应拆分后的任务。
- 结果: 消除了长尾任务,作业整体运行时间显著缩短。
3. AQE 的工作流程
AQE 的执行过程是一个“执行 -> 观察 -> 调整”的循环:
- 执行 Stage: Spark 开始执行查询的叶子节点 Stage(通常是读取数据)。
- 收集统计信息: 当 Map 阶段结束,Shuffle 数据写出后,Spark 获取真实的统计数据(如分区大小、行数)。
- 重新规划: AQE 控制器介入,根据这些数据检查是否满足优化规则(如是否倾斜、是否可广播)。
- 更新计划: 如果满足条件,修改剩余的逻辑计划和物理计划。
- 继续执行: 按照新计划执行下一个 Stage。
4. 如何配置 AQE
在 Spark 3.2+ 版本中,AQE 默认开启。在 Spark 3.0 和 3.1 中需要手动开启。
基础开关
plaintext
# 开启 AQE (Spark 3.2+ 默认为 true)
spark.sql.adaptive.enabled = true
核心参数调优
1. Shuffle 分区合并相关:
plaintext
# 开启分区合并
spark.sql.adaptive.coalescePartitions.enabled = true
# 合并后的目标分区大小 (默认 64MB,建议与 HDFS block size 相当或略小)
spark.sql.adaptive.advisoryPartitionSizeInBytes = 64m
# 初始 Shuffle 分区数 (建议设大一点,让 AQE 有合并的空间,如 1000 或 2000)
spark.sql.shuffle.partitions = 2000
2. Join 策略切换相关:
plaintext
# 开启本地 Shuffle 读取(配合 Broadcast Join 使用)
spark.sql.adaptive.localShuffleReader.enabled = true
# 广播阈值 (如果运行时数据小于此值,切换为 Broadcast Join)
spark.sql.autoBroadcastJoinThreshold = 10m
3. 数据倾斜优化相关:
plaintext
# 开启倾斜 Join 优化
spark.sql.adaptive.skewJoin.enabled = true
# 定义什么是“倾斜”:
# 1. 分区大小大于此阈值 (默认 256MB)
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256m
# 2. 且分区大小是中位数分区的 N 倍 (默认 5 倍)
spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5
5. AQE 的局限性与注意事项
尽管 AQE 非常强大,但它不是万能的:
- 只对 Spark SQL/DataFrame/Dataset 有效: 直接操作 RDD 的代码无法享受 AQE 优化。
- 增加了一点点开销: 重新规划需要时间,但在大多数情况下,节省的执行时间远超这些开销。
- UI 变化: 在 Spark UI 中,你会看到执行计划在运行时发生变化(例如多出了
AQEShuffleRead节点),这可能会让初学者在调试时感到困惑。 - 初始分区数不能太小: 如果
spark.sql.shuffle.partitions初始值设得太小,AQE 只能合并不能拆分(除非触发了倾斜优化),所以建议在使用 AQE 时调大初始分区数。
总结
Spark AQE 是现代 Spark 调优的“自动驾驶”系统。 它通过利用运行时数据,自动解决了最令人头疼的小文件问题、Join 策略选择错误和数据倾斜问题。对于大多数生产环境作业,保持 AQE 开启是最佳实践。