基于本文回答
0
评论

给定商品销售流水表,统计每个月、每个商品大类(Category)下,销售额排名前 3 的商品 ID 及其销售额

SparkSQL 面试题:月度各品类销售 Top 3 商品统计

1. 题目背景与要求

在电商数据分析中,流水的实时与离散分析是最常见的场景。现有一张商品销售流水表 sales_records,要求编写一条 SparkSQL 语句,统计每个月每个商品大类(Category)下,销售额排名前 3 的商品 ID 及其销售额。如果销售额相同,则并列排名。


2. 示例数据

表1:商品销售流水表 (sales_records)
sales_id (流水ID) product_id (商品ID) category (商品大类) sale_amount (销售额) sale_date (销售日期)
S001 P001 Electronics 5000.00 2023-10-01
S002 P001 Electronics 3000.00 2023-10-15
S003 P002 Electronics 9000.00 2023-10-10
S004 P003 Electronics 4000.00 2023-10-12
S005 P004 Electronics 2000.00 2023-10-20
S006 P005 Clothing 1500.00 2023-10-05
S007 P006 Clothing 2500.00 2023-10-06
S008 P005 Clothing 1000.00 2023-10-25
S009 P007 Clothing 3000.00 2023-10-26
S010 P008 Clothing 800.00 2023-10-28
S011 P001 Electronics 6000.00 2023-11-01

3. 期望输出结果

sale_month (月份) category (商品大类) product_id (商品ID) total_sales (总销售额) rank (排名)
2023-10 Electronics P002 9000.00 1
2023-10 Electronics P001 8000.00 2
2023-10 Electronics P003 4000.00 3
2023-10 Clothing P007 3000.00 1
2023-10 Clothing P005 2500.00 2
2023-10 Clothing P006 2500.00 2
2023-10 Clothing P008 800.00 4
2023-11 Electronics P001 6000.00 1

(注:2023-10的Clothing大类中,P005与P006销售额相同均为2500,故并列第2,P008由于并列占用名额变为第4名,未进前3。)


4. SparkSQL 标准答案

sql
WITH product_monthly_sales AS (
    -- 1. 按照月份、大类、商品ID进行聚合,计算总销售额
    SELECT 
        DATE_FORMAT(sale_date, 'yyyy-MM') AS sale_month,
        category,
        product_id,
        SUM(sale_amount) AS total_sales
    FROM 
        sales_records
    GROUP BY 
        DATE_FORMAT(sale_date, 'yyyy-MM'),
        category,
        product_id
),
ranked_sales AS (
    -- 2. 使用窗口函数进行分组排序
    SELECT 
        sale_month,
        category,
        product_id,
        total_sales,
        DENSE_RANK() OVER (
            PARTITION BY sale_month, category 
            ORDER BY total_sales DESC
        ) AS rank
    FROM 
        product_monthly_sales
)
-- 3. 过滤出排名前3的商品
SELECT 
    sale_month,
    category,
    product_id,
    total_sales,
    rank
FROM 
    ranked_sales
WHERE 
    rank <= 3
ORDER BY 
    sale_month DESC, 
    category, 
    rank;

5. 核心考点与 SparkSQL 深度分析

面试官通过这道题,主要考察候选人在 Spark 运行机制、窗口函数选择、数据倾斜处理以及算子优化方面的功底。

① 窗口函数的选择:ROW_NUMBER vs RANK vs DENSE_RANK

这是面试必问的细节。

  • ROW_NUMBER():无脑排序,1, 2, 3, 4。若遇到销售额相同的情况,会随机分出先后,不公平。
  • RANK():并列排序会跳跃,例如 1, 2, 2, 4。如果前三名出现并列第二,第四名会直接跳到第四,导致前三名额流失。
  • DENSE_RANK():并列排序不跳跃,例如 1, 2, 2, 3。本题中要求“销售额相同则并列”,且要取前 3 名,因此 DENSE_RANK 是最标准、最符合业务直觉的选择
② 性能优化:避免“大窗口”导致的 OOM 与数据倾斜

在 Spark 运行该 SQL 时,PARTITION BY sale_month, category 会触发 Shuffle(Exchange 算子)。如果某个大类(如双十一期间的“Electronics”)数据量极其庞大,会导致该 Key 对应的数据全部拉取到某一个 Executor 的单分区中进行排序,从而引发数据倾斜甚至 OOM

  • 优化方案(两阶段聚合)
    在进入 Window 算子之前,代码第一步先进行了 GROUP BY 聚合。这一步极大地降低了传输给窗口函数的数据量。
    面试话术:“我没有直接在明细表上使用窗口函数,而是先通过 GROUP BY 降低数据粒度(降维至商品级),再进行 DENSE_RANK。这样可以大幅减少 Shuffle 的数据量,有效规避 OOM。”
③ 物理执行计划分析(Execution Plan)

Spark 对此 SQL 的解析流程如下:

  1. FileSourceScan:读取 sales_records 表。
  2. HashAggregate (Partial):在本地节点对 sale_month, category, product_id 进行局部聚合(Map 端聚合,减少网络传输)。
  3. Exchange (hashpartitioning):根据聚合 Key 进行 Shuffle 重新分区。
  4. HashAggregate (Merge):在 Reduce 端完成最终的 SUM(sale_amount) 计算。
  5. Exchange (hashpartitioning):根据 sale_month, category 进行第二次 Shuffle,确保相同月份和品类的数据进入同一个分区。
  6. Window & WindowGroupLimit:在内存中对分区内的数据进行 DENSE_RANK 计算,并直接过滤出 rank <= 3 的数据(Spark 3.x 引入的 WindowLimit 优化,无需全量排序)。
右滑查看面试常问