在使用 SparkSQL 过程中都遇到了哪些问题,又是如何解决的?
在实际的大数据开发和数仓建设中,SparkSQL 是最核心的计算引擎之一。由于处理的数据量巨大且分布式环境复杂,通常会遇到各种性能瓶颈、稳定性问题和资源浪费。
以下整理了在实际项目中使用 SparkSQL 最常遇到的 6 大核心问题,以及对应的排查思路和解决方案。
一、 数据倾斜 (Data Skew)——最经典的性能杀手
1. 现象描述
- Spark UI 上,绝大多数 Task 几秒钟就执行完了,但有 一两个 Task 持续运行几小时(长尾效应),甚至最终导致 Executor OOM(内存溢出)。
- 通常发生在
JOIN、GROUP BY或DISTINCT操作中。
2. 原因分析
分布式计算中,相同的 Key 会被 Hash 分发到同一个 Reduce Task 中。如果某个 Key 的数据量极大(例如:空值 NULL、默认值、热点商家 ID),就会导致该 Task 处理的数据量远超其他 Task。
3. 解决方案
- 方案一:开启 Spark 3.x 自动自适应查询(AQE - Adaptive Query Execution)
Spark 3.0+ 引入的 AQE 可以自动检测并拆分倾斜的分区(Skew Join)。plaintextspark.sql.adaptive.enabled=true spark.sql.adaptive.skewJoin.enabled=true - 方案二:过滤或转换倾斜 Key
如果倾斜是由NULL或空字符串引起的,在 Join 之前过滤掉,或者将空值赋予随机值:sqlSELECT * FROM log a LEFT JOIN user b ON COALESCE(a.user_id, CAST(RAND() * -1000 AS STRING)) = b.user_id - 方案三:Broadcast Join(广播连接)
如果大表 Join 小表,将小表广播到每个 Executor,避免 Shuffle。sql-- Spark 默认是 10MB,可以适当调大 SET spark.sql.autoBroadcastJoinThreshold = 104857600; -- 100MB - 方案四:加盐(Salting)两阶段聚合
对于GROUP BY倾斜,在 Key 上拼接随机前缀(如0_key,1_key),先局部聚合,再去掉前缀进行全局聚合。
二、 Out of Memory (OOM) 内存溢出
OOM 主要分为 Driver OOM 和 Executor OOM。
1. Driver OOM
- 原因:
- 使用了
collect()将海量数据拉取到了 Driver 端。 - SQL 产生了极大的元数据信息,或者
broadcast广播了过大的表。
- 使用了
- 解决办法:
- 严禁在生产环境对大表使用
collect(),改用take(N)或limit N。 - 增加 Driver 内存:
--driver-memory 8g。 - 降低广播表的阈值:
spark.sql.autoBroadcastJoinThreshold。
- 严禁在生产环境对大表使用
2. Executor OOM
- 原因:
- Execution Memory 溢出:Join 或 Aggregation 过程中,缓存的数据量超出了 JVM 堆内存。
- Overhead Memory 溢出(YARN Killed Container):堆外内存不足,通常是因为使用了
UDF、OffHeap内存或 JVM 自身开销。
- 解决办法:
- 调整内存占比:调大 Shuffle 过程的执行内存比例:
spark.memory.fraction(默认 0.6) 和spark.memory.storageFraction(默认 0.5)。 - 增加堆外内存:
spark.executor.memoryOverhead(一般设为 Executor 内存的 10%~20%)。 - 降低单个 Executor 的并行度:减少每个 Executor 的 Core 数量(例如从 5 降到 3),这样每个 Core 分摊到的内存就会变多。
- 调整内存占比:调大 Shuffle 过程的执行内存比例:
三、 小文件问题 (Small Files Problem)——HDFS 的噩梦
1. 现象描述
- Spark 任务写 HDFS 速度越来越慢,产生数万个大小只有几 KB 的文件。
- 后续读取该表时,启动极慢(NameNode 寻址压力大,Task 数量爆炸)。
2. 原因分析
- 使用
INSERT INTO TABLE ... PARTITION动态分区写入时,每个 Spark Task 都会为每个分区生成一个文件。 spark.sql.shuffle.partitions设置过大(例如 2000),而实际写入的数据量很小。
3. 解决方案
- 方案一:利用 AQE 自动合并小文件(推荐)
Spark 3.x 提供了自动合并 Shuffle 分区的功能:plaintextspark.sql.adaptive.enabled=true spark.sql.adaptive.coalescePartitions.enabled=true spark.sql.adaptive.advisoryPartitionSizeInBytes=134217728 # 期望文件大小 128MB - 方案二:写入前手动重分区(Repartition/Coalesce)
在写入 HDFS 之前,使用repartition(会引入 Shuffle)或coalesce(不引入 Shuffle,但并行度降低)限制文件数量。sql-- Spark SQL Hint 写法 INSERT INTO TABLE target_table SELECT /*+ REPARTITION(10) */ * FROM source_table; - 方案三:Distribute By
在 SQL 末尾加上DISTRIBUTE BY partition_column,使相同分区的数据进入同一个 Task,从而减少每个分区产生的文件数。
四、 Spill to Disk (内存溢出到磁盘) 导致任务极慢
1. 现象描述
Spark UI 的 Stage 详情页中,看到 Spill (Memory) 和 Spill (Disk) 数值非常大(几十 GB 甚至上百 GB)。任务运行极慢,磁盘 I/O 飙升。
2. 原因分析
在进行 Sort、Join 或 Aggregation 时,Executor 内存不够用了,Spark 只能将内存中的数据临时序列化并写到本地磁盘,使用时再读回。磁盘 I/O 比内存慢两个数量级。
3. 解决方案
- 增加 Executor 内存:直接增大
--executor-memory。 - 增加 Shuffle 分区数:提高
spark.sql.shuffle.partitions(默认 200)。如果分区数太少,每个分区的数据量太大,容易发生 Spill。通常建议每个分区处理的数据量在 100MB~200MB 之间。 - 减少单个 Executor 的 Core 数量:
如果executor-memory是 12G,executor-cores是 6。可以改成 2 个 Executor,每个memory6G,cores3,降低单机并发压力。
五、 PySpark UDF (用户自定义函数) 性能极其低下
1. 现象描述
在 SparkSQL 中使用了 Python 编写的 UDF,任务运行速度呈现断崖式下跌,且 Executor 内存占用极高。
2. 原因分析
- Spark JVM 引擎在处理 PySpark UDF 时,需要将数据从 JVM 序列化,通过 Socket 发送给 Python Worker 进程,Python 处理完后再反序列化送回 JVM。
- 这种行级(Row-by-row)的数据跨进程传输和序列化开销极其巨大。
3. 解决方案
- 方案一:尽量使用 Spark SQL 内置函数。Spark SQL 内置函数(如
concat、substring等)经过了 Catalyst 优化器和 Code Generation 的极致优化。 - 方案二:使用 Vectorized UDF (Pandas UDF)。
如果必须用 Python,使用基于 Apache Arrow 的 Pandas UDF,它允许成批(Batch)传输数据,大大降低序列化开销。pythonimport pandas as pd from pyspark.sql.functions import pandas_udf @pandas_udf("double") def predict_udf(v1: pd.Series, v2: pd.Series) -> pd.Series: return v1 + v2 - 方案三:用 Scala/Java 编写 UDF。打包成 Jar 包注册到 Spark 中,避免跨语言 IPC 序列化。
六、 笛卡尔积 (Cartesian Product) 导致任务卡死
1. 现象描述
执行一个多表关联的 SQL 时,任务进度长期卡在 0% 甚至直接报 Detected implicit cartesian product... 错误。
2. 原因分析
- SQL 的
JOIN条件缺失,或者写错了(例如ON a.id = b.id误写成ON a.id = a.id)。 - 非等值 Join(如
a.val > b.val),Spark 无法使用 Hash Join,被迫退化为 Nested Loop Join(笛卡尔积)。
3. 解决方案
- 严格审查 SQL:确保所有 Join 都有明确且正确的等值关联条件。
- 小表广播:如果必须进行非等值 Join,且其中一张表较小,强制使用广播:sql
SELECT /*+ BROADCAST(b) */ * FROM table_a a JOIN table_b b ON a.val > b.val - 配置预防:不要轻易将
spark.sql.crossJoin.enabled设为true,让 Spark 在检测到隐式笛卡尔积时报错拦截,防止脏 SQL 耗尽集群资源。
总结:SparkSQL 性能调优黄金法则
在遇到问题时,可以按照以下顺序进行日常排查:
- 看 Spark UI:找最慢的 Stage,看 Task 的分布,判断是否存在数据倾斜或 Spill。
- 开 AQE:Spark 3.x 项目中,无脑开启
spark.sql.adaptive.enabled=true。 - 看 Execution Plan (Explain):看是否成功应用了 Broadcast Join,是否有不必要的 Shuffle。
- 管好分区数:根据输入数据量,合理计算并设置
spark.sql.shuffle.partitions。