基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

Spark的AQE(Adaptive Query Execution,自适应查询执行)

知识点图片

Spark AQE (Adaptive Query Execution,自适应查询执行) 是 Spark 3.0 引入的一项革命性功能(在 Spark 3.2 中默认开启),旨在解决传统静态优化器(Static Optimizer)无法解决的性能瓶颈。

简单来说,AQE 允许 Spark 在运行时(Runtime)根据实际的统计数据动态调整执行计划,而不是仅仅依赖于执行前的静态统计估算。

以下是关于 AQE 的深度解析,包括其核心功能、工作原理和配置方法。


1. 为什么需要 AQE?

在 AQE 出现之前,Spark 使用 Catalyst 优化器生成物理执行计划。这是一个基于成本的优化器(CBO),它依赖于表及其列的统计信息(如行数、最大值、最小值等)。

传统 CBO 的痛点:

  • 统计信息过时或缺失: 如果数据变化快,统计信息可能不准确。
  • 中间结果难以估算: 经过复杂的过滤(Filter)和聚合(Aggregation)后,Spark 很难准确预测中间数据集的大小。
  • 黑盒执行: 一旦计划生成并开始执行,即使发现某个阶段的数据量远超预期,Spark 也只能硬着头皮按原计划跑完,导致性能低下或 OOM。

AQE 的解决方案:
AQE 在每个 Stage(阶段)执行完毕后,利用真实的运行时统计数据,重新审视并优化剩余的逻辑计划。


2. AQE 的三大核心功能

AQE 主要通过以下三种机制来优化查询:

(1) 动态合并 Shuffle 分区 (Dynamically Coalescing Shuffle Partitions)

  • 问题: 设置 spark.sql.shuffle.partitions 是个难题。
    • 设得太小:单个分区数据量过大,导致内存溢出(OOM)或 GC 频繁。
    • 设得太大:产生大量微小分区,导致调度开销大、I/O 效率低(大量小文件)。
  • AQE 优化:
    • 你可以将初始 Shuffle 分区数设得比较大(例如 2000)。
    • 在 Shuffle Map 阶段结束后,AQE 会查看每个分区的实际数据量。
    • 如果发现连续的多个分区数据量都很小,AQE 会自动将它们合并(Coalesce)成一个适度大小的分区。
  • 结果: 既避免了 OOM,又避免了小文件和调度开销。

(2) 动态切换 Join 策略 (Dynamically Switching Join Strategies)

  • 问题: 假设你有两张表 Join,其中一张表原本很大,但在经过 Filter 过滤后变得很小(例如只有 10MB)。
    • 静态优化器可能因为原始表很大,而选择了 SortMergeJoin(需要 Shuffle 和排序,慢)。
  • AQE 优化:
    • AQE 在运行时发现过滤后的中间结果实际上小于广播阈值(默认 10MB)。
    • 它会动态将原本计划的 SortMergeJoin 更改为 BroadcastHashJoin
  • 结果: 避免了 Shuffle,大幅提升 Join 性能。

(3) 动态优化数据倾斜 Join (Dynamically Optimizing Skew Joins)

  • 问题: 数据倾斜(Data Skew)是分布式计算的噩梦。某个 Key 的数据量远超其他 Key,导致某个 Task 运行时间极长(长尾效应),拖慢整个作业。
  • AQE 优化:
    • AQE 会自动检测 Shuffle 数据中的倾斜分区。
    • 它将那个巨大的倾斜分区拆分成多个小的子任务(Splits)。
    • 同时,将 Join 另一侧对应的数据进行复制(Replicate),以适应拆分后的任务。
  • 结果: 消除了长尾任务,作业整体运行时间显著缩短。

3. AQE 的工作流程

AQE 的执行过程是一个“执行 -> 观察 -> 调整”的循环:

  1. 执行 Stage: Spark 开始执行查询的叶子节点 Stage(通常是读取数据)。
  2. 收集统计信息: 当 Map 阶段结束,Shuffle 数据写出后,Spark 获取真实的统计数据(如分区大小、行数)。
  3. 重新规划: AQE 控制器介入,根据这些数据检查是否满足优化规则(如是否倾斜、是否可广播)。
  4. 更新计划: 如果满足条件,修改剩余的逻辑计划和物理计划。
  5. 继续执行: 按照新计划执行下一个 Stage。

4. 如何配置 AQE

在 Spark 3.2+ 版本中,AQE 默认开启。在 Spark 3.0 和 3.1 中需要手动开启。

基础开关

plaintext
# 开启 AQE (Spark 3.2+ 默认为 true)
spark.sql.adaptive.enabled = true

核心参数调优

1. Shuffle 分区合并相关:

plaintext
# 开启分区合并
spark.sql.adaptive.coalescePartitions.enabled = true

# 合并后的目标分区大小 (默认 64MB,建议与 HDFS block size 相当或略小)
spark.sql.adaptive.advisoryPartitionSizeInBytes = 64m

# 初始 Shuffle 分区数 (建议设大一点,让 AQE 有合并的空间,如 1000 或 2000)
spark.sql.shuffle.partitions = 2000 

2. Join 策略切换相关:

plaintext
# 开启本地 Shuffle 读取(配合 Broadcast Join 使用)
spark.sql.adaptive.localShuffleReader.enabled = true

# 广播阈值 (如果运行时数据小于此值,切换为 Broadcast Join)
spark.sql.autoBroadcastJoinThreshold = 10m

3. 数据倾斜优化相关:

plaintext
# 开启倾斜 Join 优化
spark.sql.adaptive.skewJoin.enabled = true

# 定义什么是“倾斜”:
# 1. 分区大小大于此阈值 (默认 256MB)
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256m
# 2. 且分区大小是中位数分区的 N 倍 (默认 5 倍)
spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5

5. AQE 的局限性与注意事项

尽管 AQE 非常强大,但它不是万能的:

  1. 只对 Spark SQL/DataFrame/Dataset 有效: 直接操作 RDD 的代码无法享受 AQE 优化。
  2. 增加了一点点开销: 重新规划需要时间,但在大多数情况下,节省的执行时间远超这些开销。
  3. UI 变化: 在 Spark UI 中,你会看到执行计划在运行时发生变化(例如多出了 AQEShuffleRead 节点),这可能会让初学者在调试时感到困惑。
  4. 初始分区数不能太小: 如果 spark.sql.shuffle.partitions 初始值设得太小,AQE 只能合并不能拆分(除非触发了倾斜优化),所以建议在使用 AQE 时调大初始分区数。

总结

Spark AQE 是现代 Spark 调优的“自动驾驶”系统。 它通过利用运行时数据,自动解决了最令人头疼的小文件问题、Join 策略选择错误和数据倾斜问题。对于大多数生产环境作业,保持 AQE 开启是最佳实践。

00:00
00:00