基于本文回答
0
评论

在 Doris 执行大表与小表关联查询时,Runtime Filter 是如何实现动态过滤、减少数据扫描和网络传输的?

在 Apache Doris 中,Runtime Filter(运行时过滤) 是一种极其高效的动态查询优化技术。它专为大表与小表关联(Join)的场景设计,通过在查询运行时动态生成过滤条件,并将其下推到数据扫描阶段,从而在源头减少数据扫描量降低网络传输开销

以下是 Runtime Filter 实现动态过滤、减少扫描和网络传输的具体原理和工作机制:


一/ 核心工作流程:从 Build 端到 Probe 端

在 Doris 执行大表(Probe 表)与小表(Build 表)的 Hash Join 时,Runtime Filter 的工作流程可以分为四个阶段:

plaintext
[小表 Scan] -> [构建 Hash Table] -> 生成 Runtime Filter (Min/Max, Bloom Filter 等)
                                         |
                                         v (RPC 传输/合并)
[大表 Scan] <- [应用 Runtime Filter 过滤数据] <- 接收 Filter
     |
     v (数据量大幅减少)
[Hash Join]
  1. Build 端(小表)生成过滤器:
    Doris 首先扫描小表并构建 Hash Table。在此过程中,引擎会收集 Join Key(关联列)的数据特征,动态生成一个过滤器(如 Bloom Filter、Min/Max 等)。
  2. Filter 传输与合并(Global Runtime Filter):
    如果是在分布式场景下,多个 BE(Backend)节点上生成了局部的 Filter,Doris 会将它们汇聚到 Coordinator 节点进行合并,生成一个全局的 Runtime Filter,然后再分发给大表所在的各个 Scan 节点。
  3. Probe 端(大表)应用过滤:
    大表的 Scan 节点在开始读取数据前(或读取过程中),接收到这个 Runtime Filter,并将其应用到数据读取流程中。
  4. 执行 Join:
    大表过滤后仅剩的少数匹配行被发送到 Join 节点,与小表的 Hash Table 进行关联。

二/ 如何实现“减少数据扫描”(降低 I/O)

Runtime Filter 生成后,会尽可能深地下推到 Doris 的存储引擎(Storage Engine)中,在不同的粒度上进行数据过滤,从而避免无效的磁盘 I/O:

  1. Segment/Page 级别过滤(索引加速):
    • Min/Max Filter:如果大表的数据是按 Join Key 排序或局部有序的,Min/Max Filter 可以直接利用 Doris 的 Zone Map 索引。如果一个 Segment(数据段)或 Page 的范围与 Min/Max 无交集,整块数据将被直接跳过,完全不进行磁盘读取。
  2. 行级过滤(Row-level Filtering):
    • Bloom Filter / In Filter:如果无法进行块级跳过,Doris 会在读取数据 Page 时,对每一行(或一批行)的 Join Key 列利用 Bloom Filter 或 In Filter 进行判定。不命中的行直接丢弃,不进行后续的反序列化和列填充。
  3. 延迟物化(Late Materialization):
    • Doris 支持列式存储。如果 Runtime Filter 判定某一行被过滤掉,那么这一行对应的其他列(非 Join Key 列)就完全不需要从磁盘读取,极大地节省了 I/O 带宽。

三/ 如何实现“减少网络传输”(降低 Network Traffic)

在分布式 Join 中(如 Shuffle Join),网络传输往往是最大的瓶颈。Runtime Filter 通过“在 Shuffle 前过滤”的机制,大幅降低网络开销:

  1. 过滤发生在 Shuffle 之前:
    在没有 Runtime Filter 时,大表的所有数据必须先通过网络进行 Shuffle(重分布),分发到各个 Join 节点。
    开启 Runtime Filter 后,大表在各自的 BE 节点扫描时就已经完成了过滤。只有真正可能匹配的行才会通过网络发送到 Join 节点。
  2. 广播与合并优化:
    • 对于 Broadcast Join:Runtime Filter 在本地生成后,直接作用于本地的大表 Scan,不产生额外的网络合并开销(Local Runtime Filter)。
    • 对于 Shuffle Join:虽然需要通过 RPC 将各个节点的 Filter 发送到合并节点,但 Runtime Filter 本身体积很小(例如一个 Bloom Filter 只有几百 KB 到几 MB),用极小的网络开销(传输 Filter)换取了极大的网络节省(免去了 G 级、T 级大表数据的 Shuffle)。

四/ Runtime Filter 的三种主要类型及其适用场景

Doris 支持多种类型的 Runtime Filter,根据数据特征自动或手动选择:

过滤器类型 原理 优点 缺点 / 适用场景
Min/Max 记录小表的最小值和最大值。 超轻量,生成和传输代价极低。 仅适用于大表数据有序,能利用 Zone Map 过滤的场景。
In Filter 将小表的所有 Join Key 放入一个 Set 集合中。 100% 精确,过滤效率极高。 仅适用于小表数据量非常小(默认限制 1024 行以内)的场景。
Bloom Filter 使用位数组和哈希函数表示集合。 空间效率高,适合中等偏大范围的数据过滤。 存在一定的误判率(False Positive),计算稍重。
Bitmap 利用 Bitmap 进行精确过滤。 针对特定 ID 列、字典编码列过滤极快。 仅适用于整型且基数在一定范围内的列。

五/ Doris 中的关键配置与调优

在 Doris 中,Runtime Filter 是默认开启的,但可以通过以下参数进行微调:

  • runtime_filter_type: 指定使用哪种过滤器。例如:SET runtime_filter_type = "BLOOM_FILTER,MIN_MAX,IN";
  • runtime_filter_mode:
    • LOCAL:只在同一个 BE 节点内生效(适用于 Colocate Join 或 Broadcast Join)。
    • GLOBAL(默认):在整个集群生效,涉及网络合并分发(适用于 Shuffle Join)。
  • runtime_filter_wait_time_ms: 大表 Scan 节点等待 Runtime Filter 生成的超时时间(默认 1000ms)。如果小表太大,生成 Filter 时间过长,大表不会无限期等待,而是直接开始扫描,以防查询卡死。

总结

Doris 的 Runtime Filter 是将 “计算下推” 思想发挥到极致的体现。它通过在小表端收集特征,在物理执行的早期(大表扫描阶段)就把不匹配的数据“干掉”,实现了 I/O 阶段、网络阶段、CPU 阶段的三重减负,是大表关联查询能够达到亚秒级响应的关键技术之一。

右滑查看面试常问