基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

Spark 作业(Job)、阶段(Stage)和任务(Task)是如何划分的?

知识点图片

在 Apache Spark 中,Job(作业)Stage(阶段)Task(任务) 是三个核心的执行概念,它们之间存在层级关系。

简单来说,划分规则如下:

  1. JobAction(行动算子) 触发。
  2. StageShuffle(宽依赖) 划分。
  3. TaskPartition(分区数) 决定。

层级关系:Application -> Job -> Stage -> Task


1. Job(作业)的划分

划分依据:Action 算子

  • 定义:一个 Job 包含了一系列为了完成某个计算目标而执行的转换操作。
  • 如何划分
    • Spark 应用程序在遇到 Transformation(转换算子,如 map, filter) 时是懒执行(Lazy Evaluation)的,不会立即计算。
    • 只有当应用程序遇到一个 Action(行动算子,如 collect, count, saveAsTextFile, take) 时,SparkContext 才会向 DAGScheduler 提交一个 Job。
    • 结论:代码中有多少个 Action 算子,就会产生多少个 Job。

注意:有些特殊情况,例如 checkPoint 操作或者某些隐式的 Action 可能会触发额外的 Job。


2. Stage(阶段)的划分

划分依据:DAG 中的宽依赖(Shuffle Dependency)

这是 Spark 调度中最关键的一步。DAGScheduler 负责将一个 Job 切分成多个 Stage。

  • 依赖关系(Dependencies)

    • 窄依赖 (Narrow Dependency):父 RDD 的一个分区只被子 RDD 的一个分区使用(或一对一)。
      • 例子map, filter, union
      • 特点:数据不需要跨节点传输,可以进行流水线(Pipeline)优化。
    • 宽依赖 (Wide Dependency / Shuffle):父 RDD 的一个分区被子 RDD 的多个分区使用。
      • 例子reduceByKey, groupByKey, join (通常情况), repartition
      • 特点:涉及数据在不同节点间的混洗(Shuffle),必须等父 RDD 的所有分区计算完,才能开始计算子 RDD。
  • 如何划分

    1. DAGScheduler 从 Job 的最后一个 RDD 开始,从后往前回溯 DAG(有向无环图)。
    2. 如果遇到窄依赖,就将当前 RDD 加入到当前的 Stage 中。
    3. 如果遇到宽依赖(即 Shuffle),就切断当前 Stage,生成一个新的 Stage(父 Stage)。
    4. 结论:Stage 的数量 = Shuffle 的次数 + 1(ResultStage)。
  • Stage 的类型

    • ShuffleMapStage:非最终阶段,其任务结果是为下一个 Stage 的 Shuffle 准备数据(写磁盘)。
    • ResultStage:最终阶段,直接计算出 Action 的结果。

3. Task(任务)的划分

划分依据:RDD 的 Partition(分区)数量

  • 定义:Task 是 Spark 中最小的执行单元,它被发送到 Executor 上执行。
  • 如何划分
    • 在一个 Stage 内部,RDD 的转换逻辑是一样的,唯一的区别是处理的数据不同。
    • Spark 会为 Stage 中 RDD 的每一个 Partition(分区) 创建一个 Task。
    • 结论:一个 Stage 中 Task 的数量 = 该 Stage 中最后一个 RDD 的 Partition 数量。

例如:如果一个 Stage 的 RDD 有 100 个分区,那么这个 Stage 就会生成 100 个 Task 并行执行(受限于集群的 Core 数量)。


总结与示例

假设有如下代码:

plaintext
val lines = sc.textFile("hdfs://...")   // RDD A (假设 2 个分区)
val words = lines.flatMap(_.split(" ")) // RDD B (窄依赖)
val pairs = words.map(w => (w, 1))      // RDD C (窄依赖)
val counts = pairs.reduceByKey(_ + _)   // RDD D (宽依赖/Shuffle) -> 假设默认 2 个分区
counts.collect()                        // Action

划分流程分析:

  1. Job 划分

    • 遇到 collect() 是一个 Action。
    • 产生 1 个 Job
  2. Stage 划分

    • DAGScheduler 从 RDD D (reduceByKey 的结果) 往前看。
    • RDD C 到 RDD D 是 reduceByKey,属于宽依赖(Shuffle)。
    • 切分
      • Stage 0 (ShuffleMapStage):包含 RDD A -> RDD B -> RDD C。这些都是窄依赖,可以流水线执行。
      • Stage 1 (ResultStage):包含 RDD D。
  3. Task 划分

    • Stage 0:RDD A 有 2 个分区,所以 Stage 0 产生 2 个 Task
    • Stage 1:RDD D 有 2 个分区,所以 Stage 1 产生 2 个 Task
    • 总计:这个 Job 总共包含 4 个 Task。

一图胜千言

plaintext
[ Application ]
       |
       | (遇到 Action)
       v
    [ Job ]
       |
       | (遇到 Shuffle / 宽依赖)
       v
   [ Stage ]   ----->   [ Stage ]
       |                   |
       | (根据 Partition)   |
       v                   v
 [Task] [Task] ...   [Task] [Task] ...
00:00
00:00