Spark 作业(Job)、阶段(Stage)和任务(Task)是如何划分的?
在 Apache Spark 中,Job(作业)、Stage(阶段) 和 Task(任务) 是三个核心的执行概念,它们之间存在层级关系。
简单来说,划分规则如下:
- Job 由 Action(行动算子) 触发。
- Stage 由 Shuffle(宽依赖) 划分。
- Task 由 Partition(分区数) 决定。
层级关系:Application -> Job -> Stage -> Task
1. Job(作业)的划分
划分依据:Action 算子
- 定义:一个 Job 包含了一系列为了完成某个计算目标而执行的转换操作。
- 如何划分:
- Spark 应用程序在遇到 Transformation(转换算子,如 map, filter) 时是懒执行(Lazy Evaluation)的,不会立即计算。
- 只有当应用程序遇到一个 Action(行动算子,如 collect, count, saveAsTextFile, take) 时,SparkContext 才会向 DAGScheduler 提交一个 Job。
- 结论:代码中有多少个 Action 算子,就会产生多少个 Job。
注意:有些特殊情况,例如
checkPoint操作或者某些隐式的 Action 可能会触发额外的 Job。
2. Stage(阶段)的划分
划分依据:DAG 中的宽依赖(Shuffle Dependency)
这是 Spark 调度中最关键的一步。DAGScheduler 负责将一个 Job 切分成多个 Stage。
依赖关系(Dependencies):
- 窄依赖 (Narrow Dependency):父 RDD 的一个分区只被子 RDD 的一个分区使用(或一对一)。
- 例子:
map,filter,union。 - 特点:数据不需要跨节点传输,可以进行流水线(Pipeline)优化。
- 例子:
- 宽依赖 (Wide Dependency / Shuffle):父 RDD 的一个分区被子 RDD 的多个分区使用。
- 例子:
reduceByKey,groupByKey,join(通常情况),repartition。 - 特点:涉及数据在不同节点间的混洗(Shuffle),必须等父 RDD 的所有分区计算完,才能开始计算子 RDD。
- 例子:
- 窄依赖 (Narrow Dependency):父 RDD 的一个分区只被子 RDD 的一个分区使用(或一对一)。
如何划分:
- DAGScheduler 从 Job 的最后一个 RDD 开始,从后往前回溯 DAG(有向无环图)。
- 如果遇到窄依赖,就将当前 RDD 加入到当前的 Stage 中。
- 如果遇到宽依赖(即 Shuffle),就切断当前 Stage,生成一个新的 Stage(父 Stage)。
- 结论:Stage 的数量 = Shuffle 的次数 + 1(ResultStage)。
Stage 的类型:
- ShuffleMapStage:非最终阶段,其任务结果是为下一个 Stage 的 Shuffle 准备数据(写磁盘)。
- ResultStage:最终阶段,直接计算出 Action 的结果。
3. Task(任务)的划分
划分依据:RDD 的 Partition(分区)数量
- 定义:Task 是 Spark 中最小的执行单元,它被发送到 Executor 上执行。
- 如何划分:
- 在一个 Stage 内部,RDD 的转换逻辑是一样的,唯一的区别是处理的数据不同。
- Spark 会为 Stage 中 RDD 的每一个 Partition(分区) 创建一个 Task。
- 结论:一个 Stage 中 Task 的数量 = 该 Stage 中最后一个 RDD 的 Partition 数量。
例如:如果一个 Stage 的 RDD 有 100 个分区,那么这个 Stage 就会生成 100 个 Task 并行执行(受限于集群的 Core 数量)。
总结与示例
假设有如下代码:
plaintext
val lines = sc.textFile("hdfs://...") // RDD A (假设 2 个分区)
val words = lines.flatMap(_.split(" ")) // RDD B (窄依赖)
val pairs = words.map(w => (w, 1)) // RDD C (窄依赖)
val counts = pairs.reduceByKey(_ + _) // RDD D (宽依赖/Shuffle) -> 假设默认 2 个分区
counts.collect() // Action
划分流程分析:
Job 划分:
- 遇到
collect()是一个 Action。 - 产生 1 个 Job。
- 遇到
Stage 划分:
- DAGScheduler 从 RDD D (
reduceByKey的结果) 往前看。 - RDD C 到 RDD D 是
reduceByKey,属于宽依赖(Shuffle)。 - 切分:
- Stage 0 (ShuffleMapStage):包含 RDD A -> RDD B -> RDD C。这些都是窄依赖,可以流水线执行。
- Stage 1 (ResultStage):包含 RDD D。
- DAGScheduler 从 RDD D (
Task 划分:
- Stage 0:RDD A 有 2 个分区,所以 Stage 0 产生 2 个 Task。
- Stage 1:RDD D 有 2 个分区,所以 Stage 1 产生 2 个 Task。
- 总计:这个 Job 总共包含 4 个 Task。
一图胜千言
plaintext
[ Application ]
|
| (遇到 Action)
v
[ Job ]
|
| (遇到 Shuffle / 宽依赖)
v
[ Stage ] -----> [ Stage ]
| |
| (根据 Partition) |
v v
[Task] [Task] ... [Task] [Task] ...
右滑查看面试常问