给定商品销售流水表,统计每个月、每个商品大类(Category)下,销售额排名前 3 的商品 ID 及其销售额
SparkSQL 面试题:月度各品类销售 Top 3 商品统计
1. 题目背景与要求
在电商数据分析中,流水的实时与离散分析是最常见的场景。现有一张商品销售流水表 sales_records,要求编写一条 SparkSQL 语句,统计每个月、每个商品大类(Category)下,销售额排名前 3 的商品 ID 及其销售额。如果销售额相同,则并列排名。
2. 示例数据
表1:商品销售流水表 (sales_records)
| sales_id (流水ID) | product_id (商品ID) | category (商品大类) | sale_amount (销售额) | sale_date (销售日期) |
|---|---|---|---|---|
| S001 | P001 | Electronics | 5000.00 | 2023-10-01 |
| S002 | P001 | Electronics | 3000.00 | 2023-10-15 |
| S003 | P002 | Electronics | 9000.00 | 2023-10-10 |
| S004 | P003 | Electronics | 4000.00 | 2023-10-12 |
| S005 | P004 | Electronics | 2000.00 | 2023-10-20 |
| S006 | P005 | Clothing | 1500.00 | 2023-10-05 |
| S007 | P006 | Clothing | 2500.00 | 2023-10-06 |
| S008 | P005 | Clothing | 1000.00 | 2023-10-25 |
| S009 | P007 | Clothing | 3000.00 | 2023-10-26 |
| S010 | P008 | Clothing | 800.00 | 2023-10-28 |
| S011 | P001 | Electronics | 6000.00 | 2023-11-01 |
3. 期望输出结果
| sale_month (月份) | category (商品大类) | product_id (商品ID) | total_sales (总销售额) | rank (排名) |
|---|---|---|---|---|
| 2023-10 | Electronics | P002 | 9000.00 | 1 |
| 2023-10 | Electronics | P001 | 8000.00 | 2 |
| 2023-10 | Electronics | P003 | 4000.00 | 3 |
| 2023-10 | Clothing | P007 | 3000.00 | 1 |
| 2023-10 | Clothing | P005 | 2500.00 | 2 |
| 2023-10 | Clothing | P006 | 2500.00 | 2 |
| 2023-10 | Clothing | P008 | 800.00 | 4 |
| 2023-11 | Electronics | P001 | 6000.00 | 1 |
(注:2023-10的Clothing大类中,P005与P006销售额相同均为2500,故并列第2,P008由于并列占用名额变为第4名,未进前3。)
4. SparkSQL 标准答案
sql
WITH product_monthly_sales AS (
-- 1. 按照月份、大类、商品ID进行聚合,计算总销售额
SELECT
DATE_FORMAT(sale_date, 'yyyy-MM') AS sale_month,
category,
product_id,
SUM(sale_amount) AS total_sales
FROM
sales_records
GROUP BY
DATE_FORMAT(sale_date, 'yyyy-MM'),
category,
product_id
),
ranked_sales AS (
-- 2. 使用窗口函数进行分组排序
SELECT
sale_month,
category,
product_id,
total_sales,
DENSE_RANK() OVER (
PARTITION BY sale_month, category
ORDER BY total_sales DESC
) AS rank
FROM
product_monthly_sales
)
-- 3. 过滤出排名前3的商品
SELECT
sale_month,
category,
product_id,
total_sales,
rank
FROM
ranked_sales
WHERE
rank <= 3
ORDER BY
sale_month DESC,
category,
rank;
5. 核心考点与 SparkSQL 深度分析
面试官通过这道题,主要考察候选人在 Spark 运行机制、窗口函数选择、数据倾斜处理以及算子优化方面的功底。
① 窗口函数的选择:ROW_NUMBER vs RANK vs DENSE_RANK
这是面试必问的细节。
ROW_NUMBER():无脑排序,1, 2, 3, 4。若遇到销售额相同的情况,会随机分出先后,不公平。RANK():并列排序会跳跃,例如 1, 2, 2, 4。如果前三名出现并列第二,第四名会直接跳到第四,导致前三名额流失。DENSE_RANK():并列排序不跳跃,例如 1, 2, 2, 3。本题中要求“销售额相同则并列”,且要取前 3 名,因此DENSE_RANK是最标准、最符合业务直觉的选择。
② 性能优化:避免“大窗口”导致的 OOM 与数据倾斜
在 Spark 运行该 SQL 时,PARTITION BY sale_month, category 会触发 Shuffle(Exchange 算子)。如果某个大类(如双十一期间的“Electronics”)数据量极其庞大,会导致该 Key 对应的数据全部拉取到某一个 Executor 的单分区中进行排序,从而引发数据倾斜甚至 OOM。
- 优化方案(两阶段聚合):
在进入 Window 算子之前,代码第一步先进行了GROUP BY聚合。这一步极大地降低了传输给窗口函数的数据量。
面试话术:“我没有直接在明细表上使用窗口函数,而是先通过GROUP BY降低数据粒度(降维至商品级),再进行DENSE_RANK。这样可以大幅减少 Shuffle 的数据量,有效规避 OOM。”
③ 物理执行计划分析(Execution Plan)
Spark 对此 SQL 的解析流程如下:
- FileSourceScan:读取
sales_records表。 - HashAggregate (Partial):在本地节点对
sale_month, category, product_id进行局部聚合(Map 端聚合,减少网络传输)。 - Exchange (hashpartitioning):根据聚合 Key 进行 Shuffle 重新分区。
- HashAggregate (Merge):在 Reduce 端完成最终的
SUM(sale_amount)计算。 - Exchange (hashpartitioning):根据
sale_month, category进行第二次 Shuffle,确保相同月份和品类的数据进入同一个分区。 - Window & WindowGroupLimit:在内存中对分区内的数据进行
DENSE_RANK计算,并直接过滤出rank <= 3的数据(Spark 3.x 引入的 WindowLimit 优化,无需全量排序)。