基于本文回答
0
评论

ApplicationMaster 内部是如何计算所需资源,并向 ResourceManager 提出资源申请的?

知识点图片

在 Apache YARN 架构中,ApplicationMaster (AM) 是每个应用程序特有的实例(例如 MapReduce、Spark、Flink 都有自己实现的 AM)。AM 负责管理应用程序的生命周期,其中最核心的任务就是计算所需资源,并向 ResourceManager (RM) 申请资源

这个过程可以分为两个主要阶段:资源计算阶段(由特定计算框架决定)和资源申请阶段(通过 YARN 的标准协议进行)。

以下是详细的工作机制:


第一阶段:AM 内部如何计算所需资源?

YARN 作为一个通用的资源管理平台,它本身不关心应用需要多少资源。计算资源的逻辑完全由具体的计算框架(如 Spark、MapReduce)在 AM 内部实现。 AM 主要从两个维度计算资源:数量(Size)位置(Locality)

1. 资源数量的计算 (How much)

不同的框架计算规则不同:

  • MapReduce AM (MRAppMaster):
    • Map 阶段: AM 会读取 HDFS 上的数据切片(Input Splits)信息。通常一个 Split 对应一个 Map Task。如果数据被切分成了 100 个 Split,AM 就知道需要 100 个 Map Container。每个 Container 的内存和 CPU 则由用户配置项(如 mapreduce.map.memory.mbmapreduce.map.cpu.vcores)决定。
    • Reduce 阶段: 需求量取决于用户设置的 Reduce Task 数量(如 mapreduce.job.reduces),每个 Reduce Container 的大小同样由用户配置决定。
  • Spark AM (在 YARN-Cluster 模式下的 Driver):
    • 静态分配: 如果用户在提交任务时指定了 --num-executors 10--executor-memory 4G--executor-cores 2,AM 就会直接将这些固定配置转换为资源请求。
    • 动态分配 (Dynamic Allocation): AM 会监控当前待处理的任务积压情况。如果 Task 很多而 Executor 不够,AM 会动态增加请求的 Container 数量;如果 Executor 空闲时间超过阈值,AM 会释放资源。
  • Flink AM:
    • 取决于作业的并行度(Parallelism)以及每个 TaskManager 配置的 Slot 数量。AM 会计算出需要启动多少个 TaskManager 才能满足总并行度需求。

2. 数据本地性计算 (Where)

“移动计算比移动数据更划算”。AM 在计算需求时,不仅要算“要多大”,还要算“在哪里”。

  • AM 会解析输入数据的元数据(如 HDFS Block 的位置信息),找出数据所在的具体节点(Node)机架(Rack)
  • AM 会将这些位置偏好作为标签附带在资源请求中,期望 RM 尽量在数据所在的机器上分配资源。

第二阶段:AM 如何向 RM 提出资源申请?

当 AM 计算好所需的资源后,它会通过 YARN 提供的客户端(通常是 AMRMClient 或异步的 AMRMClientAsync)与 RM 进行通信。

1. 核心数据结构:ResourceRequest

AM 向 RM 发送的不是一句简单的话,而是一个个精密的 ResourceRequest 对象。一个典型的 ResourceRequest 包含以下关键字段:

  • Priority (优先级): 告诉 RM 哪些请求更紧急。例如在 MapReduce 中,Map 任务的优先级高于 Reduce 任务,因为 Reduce 依赖 Map 的输出;AM 自身的优先级最高。
  • ResourceName (资源位置期望): 指示期望在哪个位置分配。它可以是:
    • 特定节点名 (如 node1.hadoop.com,表示 Node Local)
    • 特定机架名 (如 /default-rack,表示 Rack Local)
    • 星号 * (表示 ANY,集群中任意节点均可)
  • Capability (资源量): 单个 Container 需要的资源维度,通常包含内存(MB)和 CPU(vCores),有时也包含 GPU 等扩展资源(如 <memory: 2048, vCores: 2>)。
  • NumContainers (数量): 满足上述条件的 Container 需要几个。
  • RelaxLocality (是否放宽本地性): 如果设为 true,当 RM 在指定节点找不到资源时,允许 RM 降级到机架甚至任意节点去分配。

举例:AM 想要在 node1 上要 1 个 2GB/1Core 的容器,它会发出三个 Request (这是 YARN 的经典三级请求机制):

  1. ResourceName: node1, NumContainers: 1 (请求具体节点)
  2. ResourceName: /rack1, NumContainers: 1 (备用方案:同机架)
  3. ResourceName: *, NumContainers: 1 (兜底方案:任意节点)

2. 申请机制:心跳协议 (Heartbeat)

AM 并不是发送一次请求就干等,资源申请是通过 RPC 心跳机制(具体方法是 ApplicationMasterProtocol.allocate())周期性完成的。

  1. 心跳周期: AM 默认每隔一小段时间(通常是 1 秒左右)向 RM 发送一次 allocate 请求。
  2. 增量更新: AM 不需要每次都发送所有的请求。它只需把新增加的请求、或者要取消的请求(比如某个 Task 失败了不需要了,或者动态分配缩容了)放在 allocate 方法的参数中发送给 RM。
  3. 获取结果: allocate 是一个双向通信机制。AM 在这个心跳中不仅发送请求,RM 也会在心跳的返回值(AllocateResponse)中告诉 AM:
    • 哪些 Container 已经为你分配好了(包含 Container ID、节点信息、授权 Token)。
    • 集群的状态更新。
    • 是否有 Container 被 RM 强制抢占(Preemption)或回收了。

3. 拿到资源后的动作

当 AM 在心跳响应中收到了 RM 分配的 Container 列表后:

  1. AM 会与这些 Container 所在的机器上的 NodeManager (NM) 取得联系。
  2. AM 通过 ContainerManagementProtocol.startContainers() 告诉 NM:“这是 RM 给我的授权(Token),请帮我启动这个进程(比如 Spark Executor 或 MR MapTask),并执行以下启动命令”。

总结流程图

plaintext
[AM 内部计算]
1. 分析作业数据 (HDFS Splits / 并行度)
2. 计算资源量 (需要 N 个 Container,每个 M 内存/C 个 CPU)
3. 计算本地性 (找出数据所在的 Node 和 Rack)
         |
         v
[构建请求]
4. 生成 ResourceRequest 对象 (包含优先级、位置、容量、数量)
         |
         v
[与 RM 交互]
5. 通过 AMRMClient 的 allocate() 心跳方法发送请求
6. RM 内部的 Scheduler (Capacity/Fair) 根据队列余量进行调度分配
7. RM 在下一次 allocate() 心跳响应中返回已分配的 Container
         |
         v
[启动任务]
8. AM 拿着分配到的 Container 信息,连接对应的 NodeManager 启动实际任务进程。
右滑查看面试常问