基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

当 Doris FE 收到客户端的 SQL 请求后,生成分布式物理执行计划的具体流程是怎样的?

在 Apache Doris 中,FE(Frontend)负责接收客户端的 SQL 请求,并将其转换为后端 BE(Backend)能够执行的分布式物理执行计划

随着 Doris 2.0 版本的发布,Doris 引入了全新的现代查询优化器 Nereids(基于 Cascades 架构)。以下是以最新架构为基础,Doris FE 将一段 SQL 转化为分布式物理执行计划的具体全流程:


第一阶段:解析与语义分析(Parse & Analyze)

1. 词法与语法分析(Parsing)

  • 动作:FE 接收到 MySQL 协议发来的 SQL 字符串后,首先经过 Parser(Nereids 中使用 ANTLR4)。
  • 结果:将 SQL 文本解析为一棵抽象语法树(AST - Abstract Syntax Tree)。此时它只是纯粹的语法结构,没有任何数据库元数据关联。

2. 语义分析(Binding / Analyzer)

  • 动作:将 AST 与 Doris 的元数据(Catalog)进行绑定(Bind)。
    • 检查表名、列名是否存在。
    • 检查用户权限。
    • 类型推导(如 INTDOUBLE 相加,结果推导为 DOUBLE)。
    • 展开 SELECT *,解析函数签名等。
  • 结果:生成一棵未优化的逻辑计划树(Unbound Logical Plan -> Bound Logical Plan)

第二阶段:查询优化(Optimization)

这是最核心的环节,Nereids 优化器会将逻辑计划转化为最优的单机物理计划。

1. 基于规则的优化(RBO - Rule-Based Optimization / Rewrite)

  • 动作:应用一系列启发式规则对逻辑计划进行等价改写。
  • 常见规则
    • 谓词下推(Predicate Pushdown):将 WHERE 条件尽可能推到靠近数据源的 Scan 节点。
    • 列裁剪(Column Pruning):去掉上层不需要的列,减少读取和传输。
    • 常量折叠(Constant Folding):如将 1 + 1 直接替换为 2
    • 子查询解嵌套(Subquery Unnesting):将复杂的子查询转化为 Join 操作。

2. 基于代价的优化(CBO - Cost-Based Optimization)

  • 动作:基于统计信息(如表的行数、列的 NDV、Min/Max、Null 比例等),估算不同执行计划的代价(CPU、内存、网络 IO),选择 Cost 最小的计划。

  • 核心决策

    • Join Reorder:决定多表 Join 的顺序(如小表先 Join,大表后 Join)。
    • Join 策略选择:决定是采用 Broadcast Join(广播小表)还是 Shuffle Join(按 Hash 重分布两表)。
    • 物化视图选择(Materialized View Selection):判断查询是否能命中已建立的异步或同步物化视图/Rollup,并改写计划直接读取视图。
  • 结果:经过 CBO 后,生成最优的单机物理计划树(Physical Plan Tree)。树上的节点变成了具体的执行算子,如 PhysicalOlapScanPhysicalHashJoinPhysicalHashAggregate 等。


第三阶段:生成分布式物理执行计划(Distributed Planning)

Doris 采用 MPP(大规模并行处理)架构。单机物理计划无法直接在多台机器上并行执行,必须将其切分为多个分布式执行片段(PlanFragment)

1. 切分 PlanFragment(插入 Exchange 节点)
FE 会自底向上遍历单机物理计划树。只要遇到需要跨节点进行网络数据传输的操作,就会在树上“切一刀”,插入 ExchangeNode(包含 DataStreamSinkDataStreamRecv)。

  • 切分规则示例
    • Shuffle Join:左右两表的数据需要根据 Join Key 进行 Hash 取模打散到不同节点。此时会在 Join 节点下方切分,生成新的 Fragment。
    • 两阶段聚合(Two-Phase Aggregation):如 GROUP BY,第一阶段在 Scan 节点所在机器做本地预聚合(Update),第二阶段需要根据 Group By Key Shuffle 到特定节点做全局聚合(Merge)。这中间也会切分出新的 Fragment。
    • 全局排序(ORDER BY):本地排序后,需要汇总到一个节点进行归并排序,也会产生切分。

2. 构建 Fragment 树
经过切分后,物理计划变成了一棵 PlanFragment 树

  • Fragment 内部:完全在同一个 BE 节点上的同一个线程内执行(Pipeline 引擎),没有任何网络交互
  • Fragment 之间:通过 Sender(数据发送端)和 Receiver(数据接收端)进行网络数据流转。

第四阶段:实例调度与分发(Scheduling & Dispatching)

生成 PlanFragment 仅仅是规划,FE 的 Coordinator 模块还需要决定这些 Fragment 跑在哪些具体的 BE 节点上,并生成执行实例(FragmentInstance)

1. 确定数据分布与并发度

  • Scan Fragment(最底层):FE 会查询元数据,找到目标表所涉及的 Tablet(分片)存放在哪些 BE 节点上(考虑副本健康度)。然后根据 parallel_fragment_exec_instance_num(并行度参数),在这些 BE 上生成多个 Scan 实例。
  • 上层 Fragment:根据下层数据的传输方式(Hash 分布、随机分布或单点分布)以及集群当前的负载情况,决定分配到哪些 BE 节点执行。

2. 分发给 BE 执行

  • FE 将计算好的 FragmentInstance 序列化,通过 RPC(Thrift/BRPC)并行发送给指定的 BE 节点。
  • BE 收到任务后,交由底层的 Pipeline 执行引擎开始真正的数据计算。

总结:数据流转全景图

可以用下面这个简化的流程图来记忆:

plaintext
SQL 请求 (Client)
  │
  ▼
[ Parser ] ---> 1. AST (抽象语法树)
  │
  ▼
[ Analyzer ] -> 2. Bound Logical Plan (绑定元数据的逻辑计划)
  │
  ▼
[ RBO ] ------> 3. Optimized Logical Plan (启发式改写后的计划)
  │
  ▼
[ CBO ] ------> 4. Single-node Physical Plan (评估代价后的单机物理算子树)
  │
  ▼
[ Planner ] --> 5. Distributed PlanFragments (按网络传输边界切分的执行片段)
  │
  ▼
[ Scheduler ]-> 6. Fragment Instances (绑定到具体 BE 节点的执行实例)
  │
  ▼
 RPC 分发至各个 BE 节点开始执行
00:00
00:00