统计快递员扫码后开门,但由于格口传感器故障导致“投递失败”并当即更换格口投递的事件,找出异常频次最高的前 50 个格口 ID
SparkSQL 面试题:快递投递异常格口分析
1. 题目背景与业务逻辑
在智能快递柜业务中,快递员的投递流程通常为:扫码 -> 箱门打开 -> 放入快递 -> 格口传感器感应到物体并闭门 -> 投递成功。
如果由于“格口传感器故障”,会导致快递员虽然放入了快递并关门,但系统判定为“投递失败”。此时,快递员通常会在极短时间内(例如 5 分钟内)在同一个快递柜上重新扫码,申请另一个格口进行投递。
本题要求通过 SparkSQL 分析投递流水日志,找出“因传感器故障导致投递失败,并当即(5分钟内)在同柜机更换格口重新投递”的异常事件,并统计出异常频次最高的前 50 个原始故障格口 ID。
2. 样例数据
表 1:投递流水表 t_delivery_log
记录快递员在快递柜上的每一次投递操作属性及状态。
| 字段名 | 类型 | 说明 |
|---|---|---|
log_id |
String | 日志唯一ID |
operator_id |
String | 快递员ID |
terminal_id |
String | 快递柜终端ID |
box_id |
String | 格口ID |
action_time |
Timestamp | 操作时间 |
status |
String | 投递状态:FAIL_SENSOR (传感器故障失败), SUCCESS (成功), FAIL_OTHER (其他失败) |
t_delivery_log 运行数据示例:
| log_id | operator_id | terminal_id | box_id | action_time | status |
|---|---|---|---|---|---|
| L01 | OP1001 | TERM_A | BOX_01 | 2023-10-24 10:00:00 | FAIL_SENSOR |
| L02 | OP1001 | TERM_A | BOX_02 | 2023-10-24 10:02:30 | SUCCESS |
| L03 | OP1002 | TERM_A | BOX_03 | 2023-10-24 10:05:00 | FAIL_SENSOR |
| L04 | OP1002 | TERM_A | BOX_03 | 2023-10-24 10:15:00 | SUCCESS |
| L05 | OP1003 | TERM_B | BOX_10 | 2023-10-24 11:00:00 | FAIL_SENSOR |
| L06 | OP1003 | TERM_B | BOX_11 | 2023-10-24 11:01:15 | SUCCESS |
| L07 | OP1004 | TERM_C | BOX_20 | 2023-10-24 12:00:00 | FAIL_SENSOR |
| L08 | OP1005 | TERM_C | BOX_21 | 2023-10-24 12:02:00 | SUCCESS |
业务场景解析(基于示例数据):
- 事件 1(满足条件):
OP1001在TERM_A投递BOX_01传感器故障失败(10:00:00),在 2.5分钟后(10:02:30)在同柜机更换BOX_02投递成功。这算一次BOX_01的异常事件。 - 事件 2(不满足条件):
OP1002在TERM_A投递BOX_03失败(10:05:00),虽然 10:15:00 投递成功,但时间间隔为 10分钟(超过5分钟阈值),不计入。 - 事件 3(满足条件):
OP1003在TERM_B投递BOX_10失败(11:00:00),在 1.25分钟后(11:01:15)在同柜机更换BOX_11投递成功。这算一次BOX_10的异常事件。 - 事件 4(不满足条件):
OP1004投递失败,虽然 2 分钟后有成功记录,但重试的是OP1005(非同一快递员),不计入。
3. 核心 SQL 实现
sql
WITH sorted_delivery AS (
SELECT
log_id,
operator_id,
terminal_id,
box_id,
action_time,
status,
-- 获取同快递员在同柜机上下一次投递的时间
LEAD(action_time) OVER (
PARTITION BY operator_id, terminal_id
ORDER BY action_time
) AS next_action_time,
-- 获取同快递员在同柜机上下一次投递的格口ID
LEAD(box_id) OVER (
PARTITION BY operator_id, terminal_id
ORDER BY action_time
) AS next_box_id
FROM t_delivery_log
)
SELECT
box_id AS fault_box_id,
COUNT(1) AS abnormal_count
FROM sorted_delivery
WHERE status = 'FAIL_SENSOR' -- 当前格口因传感器故障导致投递失败
AND next_box_id != box_id -- 快递员更换了新的格口
AND next_action_time IS NOT NULL -- 存在下一次投递行为
-- 限制下一次投递发生在该事件后的 5 分钟(300秒)内
AND CAST(next_action_time AS LONG) - CAST(action_time AS LONG) <= 300
GROUP BY box_id
ORDER BY abnormal_count DESC
LIMIT 50;
4. SparkSQL 深度解析与面试应对指南
在面试中,仅仅写出 SQL 是不够的。面试官通常会针对这段 SQL 进行追问,以考察你的 Spark 底层优化和架构理解。
解析一:窗口函数 LEAD 的物理执行与数据倾斜
- 面试官追问:“你使用了窗口函数
PARTITION BY operator_id, terminal_id,如果某些大热点柜机(如丰巢大站)或者极度活跃的快递员数据量超大,导致 Shuffle 倾斜怎么办?” - 应对技巧:
- 倾斜原因:Window 函数在 Spark 中会触发 Shuffle (HashPartitioning)。相同 Key 的数据会被分发到同一个 Executor 上进行排序。
- 解决方案 (两阶段聚合/加盐):如果遇到极端倾斜,可以对
operator_id和terminal_id进行“加盐”打散(例如将 Key 改为concat(terminal_id, '_', cast(rand()*10 as int))),但由于窗口函数跨行读取,加盐会破坏时序邻近性。 - 更优方案(过滤前置):在执行窗口函数前,先过滤掉无关状态。如果数据量巨大,可以先过滤出“活跃快递员”或“近几分钟有失败记录的柜机”,极大缩小参与 Window 的数据集规模。
解析二:时间差计算的性能细节
- 面试官追问:“你在 SQL 中使用
CAST(... AS LONG)来计算时间差,Spark 内部是如何处理的?” - 应对技巧:
- 在 Spark SQL 中,将
Timestamp转为LONG会将其转化为 Unix 时间戳(秒数),做减法非常高效。 - 也可以使用 Spark 内置函数
unix_timestamp(next_action_time) - unix_timestamp(action_time)。 - 需要注意时区问题:
unix_timestamp依赖当前 Session 的时区配置,而在生产环境(分布式集群)中,确保所有节点时区一致(如统一设为GMT+8)是防范数据对不上的核心细节。
- 在 Spark SQL 中,将
解析三:Spark 任务执行计划优化(Catalyst 优化器)
- 面试官追问:“Spark Catalyst 优化器会对你写的这个 SQL 做出哪些优化?”
- 应对技巧:
- 断言下推 (Predicate Pushdown):虽然
status = 'FAIL_SENSOR'是在最外层WHERE子句中,但因为涉及到窗口函数,Spark 无法直接将status过滤条件完全下推到数据源。因为窗口函数计算需要完整的时间序列链条(必须先保留所有记录排序,才能算出next_box_id)。 - 物理执行图:该 SQL 的物理执行计划中会包含
FileScan->Exchange(Shuffle)->Sort->Window->Filter( status = 'FAIL_SENSOR' ...)->HashAggregate。 - 主动优化建议:明确向面试官指出,如果业务上可以接受,我们可以先过滤出“5分钟内有成功投递”的快递员白名单,用
LEFT SEMI JOIN先过滤基础表,再做 Window,能节省大量的 Shuffle 磁盘和网络 I/O。
- 断言下推 (Predicate Pushdown):虽然