基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

在使用 SparkSQL 过程中都遇到了哪些问题,又是如何解决的?

在实际的大数据开发和数仓建设中,SparkSQL 是最核心的计算引擎之一。由于处理的数据量巨大且分布式环境复杂,通常会遇到各种性能瓶颈、稳定性问题和资源浪费。

以下整理了在实际项目中使用 SparkSQL 最常遇到的 6 大核心问题,以及对应的排查思路和解决方案


一、 数据倾斜 (Data Skew)——最经典的性能杀手

1. 现象描述

  • Spark UI 上,绝大多数 Task 几秒钟就执行完了,但有 一两个 Task 持续运行几小时(长尾效应),甚至最终导致 Executor OOM(内存溢出)。
  • 通常发生在 JOINGROUP BYDISTINCT 操作中。

2. 原因分析

分布式计算中,相同的 Key 会被 Hash 分发到同一个 Reduce Task 中。如果某个 Key 的数据量极大(例如:空值 NULL、默认值、热点商家 ID),就会导致该 Task 处理的数据量远超其他 Task。

3. 解决方案

  • 方案一:开启 Spark 3.x 自动自适应查询(AQE - Adaptive Query Execution)
    Spark 3.0+ 引入的 AQE 可以自动检测并拆分倾斜的分区(Skew Join)。
    plaintext
    spark.sql.adaptive.enabled=true
    spark.sql.adaptive.skewJoin.enabled=true
  • 方案二:过滤或转换倾斜 Key
    如果倾斜是由 NULL 或空字符串引起的,在 Join 之前过滤掉,或者将空值赋予随机值:
    sql
    SELECT * FROM log a 
    LEFT JOIN user b 
    ON COALESCE(a.user_id, CAST(RAND() * -1000 AS STRING)) = b.user_id
  • 方案三:Broadcast Join(广播连接)
    如果大表 Join 小表,将小表广播到每个 Executor,避免 Shuffle。
    sql
    -- Spark 默认是 10MB,可以适当调大
    SET spark.sql.autoBroadcastJoinThreshold = 104857600; -- 100MB
  • 方案四:加盐(Salting)两阶段聚合
    对于 GROUP BY 倾斜,在 Key 上拼接随机前缀(如 0_key, 1_key),先局部聚合,再去掉前缀进行全局聚合。

二、 Out of Memory (OOM) 内存溢出

OOM 主要分为 Driver OOMExecutor OOM

1. Driver OOM

  • 原因
    • 使用了 collect() 将海量数据拉取到了 Driver 端。
    • SQL 产生了极大的元数据信息,或者 broadcast 广播了过大的表。
  • 解决办法
    • 严禁在生产环境对大表使用 collect(),改用 take(N)limit N
    • 增加 Driver 内存:--driver-memory 8g
    • 降低广播表的阈值:spark.sql.autoBroadcastJoinThreshold

2. Executor OOM

  • 原因
    • Execution Memory 溢出:Join 或 Aggregation 过程中,缓存的数据量超出了 JVM 堆内存。
    • Overhead Memory 溢出(YARN Killed Container):堆外内存不足,通常是因为使用了 UDFOffHeap 内存或 JVM 自身开销。
  • 解决办法
    • 调整内存占比:调大 Shuffle 过程的执行内存比例:
      spark.memory.fraction (默认 0.6) 和 spark.memory.storageFraction (默认 0.5)。
    • 增加堆外内存
      spark.executor.memoryOverhead(一般设为 Executor 内存的 10%~20%)。
    • 降低单个 Executor 的并行度:减少每个 Executor 的 Core 数量(例如从 5 降到 3),这样每个 Core 分摊到的内存就会变多。

三、 小文件问题 (Small Files Problem)——HDFS 的噩梦

1. 现象描述

  • Spark 任务写 HDFS 速度越来越慢,产生数万个大小只有几 KB 的文件。
  • 后续读取该表时,启动极慢(NameNode 寻址压力大,Task 数量爆炸)。

2. 原因分析

  • 使用 INSERT INTO TABLE ... PARTITION 动态分区写入时,每个 Spark Task 都会为每个分区生成一个文件。
  • spark.sql.shuffle.partitions 设置过大(例如 2000),而实际写入的数据量很小。

3. 解决方案

  • 方案一:利用 AQE 自动合并小文件(推荐)
    Spark 3.x 提供了自动合并 Shuffle 分区的功能:
    plaintext
    spark.sql.adaptive.enabled=true
    spark.sql.adaptive.coalescePartitions.enabled=true
    spark.sql.adaptive.advisoryPartitionSizeInBytes=134217728 # 期望文件大小 128MB
  • 方案二:写入前手动重分区(Repartition/Coalesce)
    在写入 HDFS 之前,使用 repartition(会引入 Shuffle)或 coalesce(不引入 Shuffle,但并行度降低)限制文件数量。
    sql
    -- Spark SQL Hint 写法
    INSERT INTO TABLE target_table SELECT /*+ REPARTITION(10) */ * FROM source_table;
  • 方案三:Distribute By
    在 SQL 末尾加上 DISTRIBUTE BY partition_column,使相同分区的数据进入同一个 Task,从而减少每个分区产生的文件数。

四、 Spill to Disk (内存溢出到磁盘) 导致任务极慢

1. 现象描述

Spark UI 的 Stage 详情页中,看到 Spill (Memory)Spill (Disk) 数值非常大(几十 GB 甚至上百 GB)。任务运行极慢,磁盘 I/O 飙升。

2. 原因分析

在进行 Sort、Join 或 Aggregation 时,Executor 内存不够用了,Spark 只能将内存中的数据临时序列化并写到本地磁盘,使用时再读回。磁盘 I/O 比内存慢两个数量级

3. 解决方案

  • 增加 Executor 内存:直接增大 --executor-memory
  • 增加 Shuffle 分区数:提高 spark.sql.shuffle.partitions(默认 200)。如果分区数太少,每个分区的数据量太大,容易发生 Spill。通常建议每个分区处理的数据量在 100MB~200MB 之间
  • 减少单个 Executor 的 Core 数量
    如果 executor-memory 是 12G,executor-cores 是 6。可以改成 2 个 Executor,每个 memory 6G,cores 3,降低单机并发压力。

五、 PySpark UDF (用户自定义函数) 性能极其低下

1. 现象描述

在 SparkSQL 中使用了 Python 编写的 UDF,任务运行速度呈现断崖式下跌,且 Executor 内存占用极高。

2. 原因分析

  • Spark JVM 引擎在处理 PySpark UDF 时,需要将数据从 JVM 序列化,通过 Socket 发送给 Python Worker 进程,Python 处理完后再反序列化送回 JVM。
  • 这种行级(Row-by-row)的数据跨进程传输和序列化开销极其巨大。

3. 解决方案

  • 方案一:尽量使用 Spark SQL 内置函数。Spark SQL 内置函数(如 concatsubstring 等)经过了 Catalyst 优化器和 Code Generation 的极致优化。
  • 方案二:使用 Vectorized UDF (Pandas UDF)
    如果必须用 Python,使用基于 Apache Arrow 的 Pandas UDF,它允许成批(Batch)传输数据,大大降低序列化开销。
    python
    import pandas as pd
    from pyspark.sql.functions import pandas_udf
    
    @pandas_udf("double")
    def predict_udf(v1: pd.Series, v2: pd.Series) -> pd.Series:
        return v1 + v2
  • 方案三:用 Scala/Java 编写 UDF。打包成 Jar 包注册到 Spark 中,避免跨语言 IPC 序列化。

六、 笛卡尔积 (Cartesian Product) 导致任务卡死

1. 现象描述

执行一个多表关联的 SQL 时,任务进度长期卡在 0% 甚至直接报 Detected implicit cartesian product... 错误。

2. 原因分析

  • SQL 的 JOIN 条件缺失,或者写错了(例如 ON a.id = b.id 误写成 ON a.id = a.id)。
  • 非等值 Join(如 a.val > b.val),Spark 无法使用 Hash Join,被迫退化为 Nested Loop Join(笛卡尔积)。

3. 解决方案

  • 严格审查 SQL:确保所有 Join 都有明确且正确的等值关联条件。
  • 小表广播:如果必须进行非等值 Join,且其中一张表较小,强制使用广播:
    sql
    SELECT /*+ BROADCAST(b) */ * FROM table_a a JOIN table_b b ON a.val > b.val
  • 配置预防:不要轻易将 spark.sql.crossJoin.enabled 设为 true,让 Spark 在检测到隐式笛卡尔积时报错拦截,防止脏 SQL 耗尽集群资源。

总结:SparkSQL 性能调优黄金法则

在遇到问题时,可以按照以下顺序进行日常排查:

  1. 看 Spark UI:找最慢的 Stage,看 Task 的分布,判断是否存在数据倾斜Spill
  2. 开 AQE:Spark 3.x 项目中,无脑开启 spark.sql.adaptive.enabled=true
  3. 看 Execution Plan (Explain):看是否成功应用了 Broadcast Join,是否有不必要的 Shuffle。
  4. 管好分区数:根据输入数据量,合理计算并设置 spark.sql.shuffle.partitions
00:00
00:00