ApplicationMaster 内部是如何计算所需资源,并向 ResourceManager 提出资源申请的?
在 Apache YARN 架构中,ApplicationMaster (AM) 是每个应用程序特有的实例(例如 MapReduce、Spark、Flink 都有自己实现的 AM)。AM 负责管理应用程序的生命周期,其中最核心的任务就是计算所需资源,并向 ResourceManager (RM) 申请资源。
这个过程可以分为两个主要阶段:资源计算阶段(由特定计算框架决定)和资源申请阶段(通过 YARN 的标准协议进行)。
以下是详细的工作机制:
第一阶段:AM 内部如何计算所需资源?
YARN 作为一个通用的资源管理平台,它本身不关心应用需要多少资源。计算资源的逻辑完全由具体的计算框架(如 Spark、MapReduce)在 AM 内部实现。 AM 主要从两个维度计算资源:数量(Size)和位置(Locality)。
1. 资源数量的计算 (How much)
不同的框架计算规则不同:
- MapReduce AM (MRAppMaster):
- Map 阶段: AM 会读取 HDFS 上的数据切片(Input Splits)信息。通常一个 Split 对应一个 Map Task。如果数据被切分成了 100 个 Split,AM 就知道需要 100 个 Map Container。每个 Container 的内存和 CPU 则由用户配置项(如
mapreduce.map.memory.mb和mapreduce.map.cpu.vcores)决定。 - Reduce 阶段: 需求量取决于用户设置的 Reduce Task 数量(如
mapreduce.job.reduces),每个 Reduce Container 的大小同样由用户配置决定。
- Map 阶段: AM 会读取 HDFS 上的数据切片(Input Splits)信息。通常一个 Split 对应一个 Map Task。如果数据被切分成了 100 个 Split,AM 就知道需要 100 个 Map Container。每个 Container 的内存和 CPU 则由用户配置项(如
- Spark AM (在 YARN-Cluster 模式下的 Driver):
- 静态分配: 如果用户在提交任务时指定了
--num-executors 10、--executor-memory 4G、--executor-cores 2,AM 就会直接将这些固定配置转换为资源请求。 - 动态分配 (Dynamic Allocation): AM 会监控当前待处理的任务积压情况。如果 Task 很多而 Executor 不够,AM 会动态增加请求的 Container 数量;如果 Executor 空闲时间超过阈值,AM 会释放资源。
- 静态分配: 如果用户在提交任务时指定了
- Flink AM:
- 取决于作业的并行度(Parallelism)以及每个 TaskManager 配置的 Slot 数量。AM 会计算出需要启动多少个 TaskManager 才能满足总并行度需求。
2. 数据本地性计算 (Where)
“移动计算比移动数据更划算”。AM 在计算需求时,不仅要算“要多大”,还要算“在哪里”。
- AM 会解析输入数据的元数据(如 HDFS Block 的位置信息),找出数据所在的具体节点(Node)和机架(Rack)。
- AM 会将这些位置偏好作为标签附带在资源请求中,期望 RM 尽量在数据所在的机器上分配资源。
第二阶段:AM 如何向 RM 提出资源申请?
当 AM 计算好所需的资源后,它会通过 YARN 提供的客户端(通常是 AMRMClient 或异步的 AMRMClientAsync)与 RM 进行通信。
1. 核心数据结构:ResourceRequest
AM 向 RM 发送的不是一句简单的话,而是一个个精密的 ResourceRequest 对象。一个典型的 ResourceRequest 包含以下关键字段:
- Priority (优先级): 告诉 RM 哪些请求更紧急。例如在 MapReduce 中,Map 任务的优先级高于 Reduce 任务,因为 Reduce 依赖 Map 的输出;AM 自身的优先级最高。
- ResourceName (资源位置期望): 指示期望在哪个位置分配。它可以是:
- 特定节点名 (如
node1.hadoop.com,表示 Node Local) - 特定机架名 (如
/default-rack,表示 Rack Local) - 星号
*(表示 ANY,集群中任意节点均可)
- 特定节点名 (如
- Capability (资源量): 单个 Container 需要的资源维度,通常包含内存(MB)和 CPU(vCores),有时也包含 GPU 等扩展资源(如
<memory: 2048, vCores: 2>)。 - NumContainers (数量): 满足上述条件的 Container 需要几个。
- RelaxLocality (是否放宽本地性): 如果设为 true,当 RM 在指定节点找不到资源时,允许 RM 降级到机架甚至任意节点去分配。
举例:AM 想要在 node1 上要 1 个 2GB/1Core 的容器,它会发出三个 Request (这是 YARN 的经典三级请求机制):
ResourceName: node1,NumContainers: 1(请求具体节点)ResourceName: /rack1,NumContainers: 1(备用方案:同机架)ResourceName: *,NumContainers: 1(兜底方案:任意节点)
2. 申请机制:心跳协议 (Heartbeat)
AM 并不是发送一次请求就干等,资源申请是通过 RPC 心跳机制(具体方法是 ApplicationMasterProtocol.allocate())周期性完成的。
- 心跳周期: AM 默认每隔一小段时间(通常是 1 秒左右)向 RM 发送一次
allocate请求。 - 增量更新: AM 不需要每次都发送所有的请求。它只需把新增加的请求、或者要取消的请求(比如某个 Task 失败了不需要了,或者动态分配缩容了)放在
allocate方法的参数中发送给 RM。 - 获取结果:
allocate是一个双向通信机制。AM 在这个心跳中不仅发送请求,RM 也会在心跳的返回值(AllocateResponse)中告诉 AM:- 哪些 Container 已经为你分配好了(包含 Container ID、节点信息、授权 Token)。
- 集群的状态更新。
- 是否有 Container 被 RM 强制抢占(Preemption)或回收了。
3. 拿到资源后的动作
当 AM 在心跳响应中收到了 RM 分配的 Container 列表后:
- AM 会与这些 Container 所在的机器上的 NodeManager (NM) 取得联系。
- AM 通过
ContainerManagementProtocol.startContainers()告诉 NM:“这是 RM 给我的授权(Token),请帮我启动这个进程(比如 Spark Executor 或 MR MapTask),并执行以下启动命令”。
总结流程图
[AM 内部计算]
1. 分析作业数据 (HDFS Splits / 并行度)
2. 计算资源量 (需要 N 个 Container,每个 M 内存/C 个 CPU)
3. 计算本地性 (找出数据所在的 Node 和 Rack)
|
v
[构建请求]
4. 生成 ResourceRequest 对象 (包含优先级、位置、容量、数量)
|
v
[与 RM 交互]
5. 通过 AMRMClient 的 allocate() 心跳方法发送请求
6. RM 内部的 Scheduler (Capacity/Fair) 根据队列余量进行调度分配
7. RM 在下一次 allocate() 心跳响应中返回已分配的 Container
|
v
[启动任务]
8. AM 拿着分配到的 Container 信息,连接对应的 NodeManager 启动实际任务进程。