讲讲Doris 的 Pipeline 执行引擎的工作原理
Apache Doris 的 Pipeline 执行引擎(在 2.0 版本及以后成为默认引擎)是其核心的查询执行技术之一。它的引入是为了解决传统火山模型(Volcano Model)在多核 CPU 时代面临的瓶颈。
下面我将从背景背景、核心概念、工作原理、调度机制以及优势这几个方面,详细为你剖析 Doris Pipeline 执行引擎的工作原理。
一、 背景:为什么要从“火山模型”转向“Pipeline”?
在旧的火山模型中:
- 线程绑定:Doris 为每个查询的每个 Fragment(执行片段)分配独立的线程。
- 高并发瓶颈:当并发查询量很高时,系统会创建成百上千个线程。这导致极大的 CPU 上下文切换开销和内存开销。
- 阻塞问题:如果某个算子(如等待网络数据或 I/O)阻塞,对应的线程就会被挂起,无法释放给其他任务使用,导致 CPU 资源浪费。
Pipeline 引擎的设计目标就是:解耦“执行线程”与“查询任务”,采用类似“协程”的思想,用固定的线程池来驱动所有的查询任务,实现极致的 CPU 利用率。
二、 Pipeline 引擎的核心概念
为了理解它的工作原理,首先要了解它重新定义的几个核心概念:
- Operator(算子)
- 数据的最小处理单元。分为三类:
- Source Operator:数据源头(如 Scan、Exchange Receiver)。
- Sink Operator:数据终点(如 Result Sink、Exchange Sender)。
- Transform Operator:数据转换(如 Filter、Project)。
- 数据的最小处理单元。分为三类:
- Pipeline(管道)
- 由一系列非阻塞的 Operator 串联而成的链条(例如:
Scan -> Filter -> Project -> Hash Join Build)。 - Pipeline 边界的划分标准:遇到阻塞算子(Pipeline Breaker,如 Hash Build、Sort、Aggregation)时,Pipeline 就会被切断。因为这些算子必须等待所有数据到达才能继续。
- 由一系列非阻塞的 Operator 串联而成的链条(例如:
- PipelineTask(执行任务)
- Pipeline 是一个逻辑结构,而 PipelineTask 是其实际的执行实例(类似于 Spark 的 Task)。
- 数据的并行度(DOP)决定了一个 Pipeline 会被拆分成多少个 PipelineTask 实例并行运行。
三、 Pipeline 执行引擎的工作原理
Doris Pipeline 的核心工作原理可以概括为:“数据驱动 + 异步阻塞 + 协同调度”。
1. 查询计划的切分(Pipeline Decomposition)
当 Frontend (FE) 产生查询计划后,Backend (BE) 的 Pipeline 引擎会将其切分成多个 Pipeline。
- 示例:一个简单的 Join 查询:
Select * from A join B on A.id = B.id - 切分结果:
- Pipeline 1:读取 B 表(Scan) -> 建立哈希表(Hash Join Build Sink)。
- Pipeline 2:读取 A 表(Scan) -> 探测哈希表(Hash Join Probe) -> 输出结果(Result Sink)。
- 注:Pipeline 2 必须等待 Pipeline 1 的 Hash Table 构建完成后才能开始 Probe。
2. 数据驱动与非阻塞执行(Data-Driven & Non-blocking)
在执行时,每个 PipelineTask 的执行状态由数据是否准备好决定。
- 传统的 Volcano 模型是用
next()强行拉取数据,不关心是否阻塞。 - Pipeline 模型的 Operator 提供了三个核心接口:
is_blocked():判断当前算子是否处于阻塞状态(例如:Buffer 满了、网络数据没到、Hash Table 没建好)。get_block():获取数据(Source)。sink():消费数据(Sink)。
3. 任务状态机
一个 PipelineTask 在生命周期内会在以下几种状态中切换:
- Runnable(就绪):数据和资源已准备好,等待 CPU 调度执行。
- Blocked(阻塞):等待某种事件(如磁盘 I/O 完成、网络数据到达、下游缓冲区有空余)。
- Running(运行中):正在 CPU 上执行。
- Finished(完成):执行结束。
四、 核心调度机制(The Scheduler)
Doris Pipeline 引擎内部有一个全局的调度器(Scheduler),通常包含一个固定大小的线程池(线程数通常等于 CPU 物理核心数)。
调度器的核心循环(Loop)如下:
plaintext
+------------------------+
| Ready Queue | <----------+
| (Runnable PipelineTask)| |
+------------------------+ |
| |
[Pop Task] |
| |
v |
+------------------------+ |
| Worker Thread | |
| (Execute Task Chunk) | |
+------------------------+ |
| |
[Time Slice/Block/Done] |
| |
+----------------------+--------------------+ |
| (Blocked) | (Finished) | (Yield)
v v v |
+------------------+ +------------+ +------------+
| Blocked Queue | | Destructor | | Re-queue |
+------------------+ +------------+ +------------+
|
[Event Triggered
(e.g., IO Ready)]
|
+------------------------------------------------+
- 从 Ready 队列获取任务:工作线程(Worker Thread)从全局的
Ready Queue中获取一个就绪的PipelineTask。 - 时间片轮转(Time Slicing):
- 为了防止一个大查询霸占 CPU 导致小查询饿死,Doris 引入了时间片机制(通常为 10~20ms)。
- Task 运行完一个时间片后,如果还没运行完,会主动出让(Yield)CPU,重新放回
Ready Queue尾部,等待下一次调度。
- 异步阻塞处理:
- 如果 Task 在执行过程中遇到了阻塞(例如 Exchange 算子没收到网络数据),Task 会主动返回,状态变为
Blocked,并被放入Blocked Queue。 - 工作线程不会被阻塞,它会立即去
Ready Queue取下一个任务执行。
- 如果 Task 在执行过程中遇到了阻塞(例如 Exchange 算子没收到网络数据),Task 会主动返回,状态变为
- 唤醒机制:
- 有一个独立的后台线程(通常利用 Linux 的 Epoll 或 Check 线程)监控这些 Blocked 状态的 Task。
- 一旦它等待的事件发生了(如数据包到达),该 Task 就会被唤醒,重新移入
Ready Queue。
五、 Doris Pipeline 引擎的优势
- 极高的 CPU 利用率与极低的上下文切换:
- 线程数固定,避免了上千个线程竞争 CPU 带来的 Context Switch 损耗。
- 优秀的混合负载(HTAP)与多租户资源隔离:
- 配合 Workload Group(资源组),调度器可以精细控制不同用户的 Task 占用的 CPU 时间片比例。例如:给报表查询 80% 的 CPU 时间,给慢查询 20% 的 CPU 时间。
- 防止大查询“饿死”小查询:
- 引入时间片调度,大查询的 Task 会被强制暂停并让出 CPU,保证高并发小查询的低延迟。
- 内存控制更精准:
- 因为数据是按 Block(块)流式拉取的,且可以根据下游的消费速度反压(Backpressure)上游,避免了传统模型中一次性加载过多数据导致 OOM 的问题。
总结
Apache Doris 的 Pipeline 执行引擎,本质上是将关系型代数的物理执行计划,转化为一个由事件驱动的、非阻塞的、支持时间片轮转的协程调度系统。它通过固定的 CPU 线程池驱动海量的数据管道流动,是 Doris 承载高并发、低延迟实时分析(RT-OLAP)的核心基石。
右滑查看面试常问