基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

讲讲Doris 的 Pipeline 执行引擎的工作原理

Apache Doris 的 Pipeline 执行引擎(在 2.0 版本及以后成为默认引擎)是其核心的查询执行技术之一。它的引入是为了解决传统火山模型(Volcano Model)在多核 CPU 时代面临的瓶颈。

下面我将从背景背景、核心概念、工作原理、调度机制以及优势这几个方面,详细为你剖析 Doris Pipeline 执行引擎的工作原理。


一、 背景:为什么要从“火山模型”转向“Pipeline”?

在旧的火山模型中:

  1. 线程绑定:Doris 为每个查询的每个 Fragment(执行片段)分配独立的线程。
  2. 高并发瓶颈:当并发查询量很高时,系统会创建成百上千个线程。这导致极大的 CPU 上下文切换开销和内存开销。
  3. 阻塞问题:如果某个算子(如等待网络数据或 I/O)阻塞,对应的线程就会被挂起,无法释放给其他任务使用,导致 CPU 资源浪费。

Pipeline 引擎的设计目标就是:解耦“执行线程”与“查询任务”,采用类似“协程”的思想,用固定的线程池来驱动所有的查询任务,实现极致的 CPU 利用率。


二、 Pipeline 引擎的核心概念

为了理解它的工作原理,首先要了解它重新定义的几个核心概念:

  1. Operator(算子)
    • 数据的最小处理单元。分为三类:
      • Source Operator:数据源头(如 Scan、Exchange Receiver)。
      • Sink Operator:数据终点(如 Result Sink、Exchange Sender)。
      • Transform Operator:数据转换(如 Filter、Project)。
  2. Pipeline(管道)
    • 由一系列非阻塞的 Operator 串联而成的链条(例如:Scan -> Filter -> Project -> Hash Join Build)。
    • Pipeline 边界的划分标准:遇到阻塞算子(Pipeline Breaker,如 Hash Build、Sort、Aggregation)时,Pipeline 就会被切断。因为这些算子必须等待所有数据到达才能继续。
  3. PipelineTask(执行任务)
    • Pipeline 是一个逻辑结构,而 PipelineTask 是其实际的执行实例(类似于 Spark 的 Task)。
    • 数据的并行度(DOP)决定了一个 Pipeline 会被拆分成多少个 PipelineTask 实例并行运行。

三、 Pipeline 执行引擎的工作原理

Doris Pipeline 的核心工作原理可以概括为:“数据驱动 + 异步阻塞 + 协同调度”

1. 查询计划的切分(Pipeline Decomposition)

当 Frontend (FE) 产生查询计划后,Backend (BE) 的 Pipeline 引擎会将其切分成多个 Pipeline。

  • 示例:一个简单的 Join 查询:Select * from A join B on A.id = B.id
  • 切分结果
    • Pipeline 1:读取 B 表(Scan) -> 建立哈希表(Hash Join Build Sink)。
    • Pipeline 2:读取 A 表(Scan) -> 探测哈希表(Hash Join Probe) -> 输出结果(Result Sink)。
    • 注:Pipeline 2 必须等待 Pipeline 1 的 Hash Table 构建完成后才能开始 Probe。

2. 数据驱动与非阻塞执行(Data-Driven & Non-blocking)

在执行时,每个 PipelineTask 的执行状态由数据是否准备好决定。

  • 传统的 Volcano 模型是用 next() 强行拉取数据,不关心是否阻塞。
  • Pipeline 模型的 Operator 提供了三个核心接口:
    • is_blocked():判断当前算子是否处于阻塞状态(例如:Buffer 满了、网络数据没到、Hash Table 没建好)。
    • get_block():获取数据(Source)。
    • sink():消费数据(Sink)。

3. 任务状态机

一个 PipelineTask 在生命周期内会在以下几种状态中切换:

  • Runnable(就绪):数据和资源已准备好,等待 CPU 调度执行。
  • Blocked(阻塞):等待某种事件(如磁盘 I/O 完成、网络数据到达、下游缓冲区有空余)。
  • Running(运行中):正在 CPU 上执行。
  • Finished(完成):执行结束。

四、 核心调度机制(The Scheduler)

Doris Pipeline 引擎内部有一个全局的调度器(Scheduler),通常包含一个固定大小的线程池(线程数通常等于 CPU 物理核心数)。

调度器的核心循环(Loop)如下:

plaintext
                    +------------------------+
                    |      Ready Queue       | <----------+
                    | (Runnable PipelineTask)|            |
                    +------------------------+            |
                                |                         |
                         [Pop Task]                       |
                                |                         |
                                v                         |
                    +------------------------+            |
                    |    Worker Thread       |            |
                    | (Execute Task Chunk)   |            |
                    +------------------------+            |
                                |                         |
                     [Time Slice/Block/Done]              |
                                |                         |
         +----------------------+--------------------+    |
         | (Blocked)            | (Finished)         | (Yield)
         v                      v                    v    |
+------------------+     +------------+      +------------+
|   Blocked Queue  |     | Destructor |      | Re-queue   |
+------------------+     +------------+      +------------+
         |
  [Event Triggered
  (e.g., IO Ready)]
         |
         +------------------------------------------------+
  1. 从 Ready 队列获取任务:工作线程(Worker Thread)从全局的 Ready Queue 中获取一个就绪的 PipelineTask
  2. 时间片轮转(Time Slicing)
    • 为了防止一个大查询霸占 CPU 导致小查询饿死,Doris 引入了时间片机制(通常为 10~20ms)。
    • Task 运行完一个时间片后,如果还没运行完,会主动出让(Yield)CPU,重新放回 Ready Queue 尾部,等待下一次调度。
  3. 异步阻塞处理
    • 如果 Task 在执行过程中遇到了阻塞(例如 Exchange 算子没收到网络数据),Task 会主动返回,状态变为 Blocked,并被放入 Blocked Queue
    • 工作线程不会被阻塞,它会立即去 Ready Queue 取下一个任务执行。
  4. 唤醒机制
    • 有一个独立的后台线程(通常利用 Linux 的 Epoll 或 Check 线程)监控这些 Blocked 状态的 Task。
    • 一旦它等待的事件发生了(如数据包到达),该 Task 就会被唤醒,重新移入 Ready Queue

五、 Doris Pipeline 引擎的优势

  1. 极高的 CPU 利用率与极低的上下文切换
    • 线程数固定,避免了上千个线程竞争 CPU 带来的 Context Switch 损耗。
  2. 优秀的混合负载(HTAP)与多租户资源隔离
    • 配合 Workload Group(资源组),调度器可以精细控制不同用户的 Task 占用的 CPU 时间片比例。例如:给报表查询 80% 的 CPU 时间,给慢查询 20% 的 CPU 时间。
  3. 防止大查询“饿死”小查询
    • 引入时间片调度,大查询的 Task 会被强制暂停并让出 CPU,保证高并发小查询的低延迟。
  4. 内存控制更精准
    • 因为数据是按 Block(块)流式拉取的,且可以根据下游的消费速度反压(Backpressure)上游,避免了传统模型中一次性加载过多数据导致 OOM 的问题。

总结

Apache Doris 的 Pipeline 执行引擎,本质上是将关系型代数的物理执行计划,转化为一个由事件驱动的、非阻塞的、支持时间片轮转的协程调度系统。它通过固定的 CPU 线程池驱动海量的数据管道流动,是 Doris 承载高并发、低延迟实时分析(RT-OLAP)的核心基石。

00:00
00:00