基于本文回答

播面 播面

刷题像听歌,多听自然懂
0
评论

Aggregation(聚合)合并引擎支持哪些聚合函数?它在读取时是如何对多条相同主键的增量记录进行指标聚合的?

知识点图片

在 Apache Paimon 的官方文档中,Aggregation(聚合)合并引擎是一种极具弹性的主键表(Primary Key Table)合并机制。它允许用户在主键相同的情况下,不对数据进行简单的覆盖(Deduplicate),而是对各个非主键字段分别应用特定的聚合函数,实现存储层的实时预聚合

以下是根据 Apache Paimon master 最新官方文档整理的详细解答。


一、 Aggregation 合并引擎支持的聚合函数

当表配置为 'merge-engine' = 'aggregation' 时,主键之外的每一个值字段都可以通过 fields.<field-name>.aggregate-function 属性指定聚合函数;如果不指定,默认使用 last_non_null_value

Paimon master 文档中列出的支持聚合函数如下:

1. 基础数值与比较聚合

  • sum:累加求和。
    • 支持类型DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE
  • product:乘积计算。
    • 支持类型DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE
  • count:计数统计(通常可将条件转为 1 或 0 之后通过 SUM 间接统计,也支持直接计数)。
  • max:保留最大值。
    • 支持类型CHAR, VARCHAR, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
  • min:保留最小值。
    • 支持类型:同 max

2. 位置与时间序聚合

  • last_value:总是用最新输入的值覆盖旧值(哪怕最新值是 NULL)。
    • 支持类型:所有数据类型。
  • last_non_null_value:用最新的非空(Non-null)值覆盖旧值。
    • 支持类型:所有数据类型。
  • first_value:保留第一次导入的值,后续输入被忽略。
  • first_non_null_value:保留第一次导入的非空值。

3. 文本与逻辑聚合

  • listagg:将多行字符串拼接成一个字符串。
    • 支持类型STRING
    • 相关配置:可通过 fields.<field-name>.list-agg-delimiter 自定义分隔符(默认是 ,),以及通过 fields.<field-name>.distinct=true 进行去重。
  • bool_and:对布尔值字段进行逻辑与(AND)操作。
    • 支持类型BOOLEAN
  • bool_or:对布尔值字段进行逻辑或(OR)操作。
    • 支持类型BOOLEAN

4. 高级与复杂数据结构聚合

  • collect:将多行数据收集并构造成一个 Array(数组)。
    • 支持类型ARRAY
    • 相关配置:支持设置 fields.<field-name>.distinct=true 自动进行数组元素去重。
  • merge_map:合并多个 Map 键值对。
    • 支持类型MAP
  • merge_map_with_keytime:基于时间戳对 Map 元素进行Key 级别的局部更新
    • 支持类型MAP<key_type, ROW<value_field, ts_field>>
    • 工作逻辑:每个 Key 均带有时间戳,合并时仅保留该 Key 下时间戳最新的 Value;如果最新 Value 为 NULL,则该 Key 会被从 Map 中移除。
  • nested_update:将多行记录收集为一个嵌套的 Array 行(类似于嵌套表,Nested Table)。
    • 支持类型ARRAY
    • 工作逻辑:必须通过 fields.<field-name>.nested-key 指定嵌套主键,并基于该主键进行去重和字段更新。
  • nested_partial_update:功能类似于 nested_update,但它支持在行内进行部分列更新(Partial Update)

5. 基数估算(Cardinality Sketches)

Paimon 整合了 Apache DataSketches 库,支持在存储层直接进行去重基数估算:

  • hll_sketch:使用 HyperLogLog 算法对多条序列化的 Sketch 对象进行合并聚合(适用于高精度的近似去重,节省空间)。
    • 支持类型VARBINARY
  • theta_sketch:使用 Theta Sketch 算法合并 Sketch,除了去重外,还支持集合的交、并、差集计算。
    • 支持类型VARBINARY

6. 位图聚合(Bitmap)

  • rbm32 & rbm64:合并已序列化的 32 位或 64 位 RoaringBitmap。
    • 工作原理:将输入的二进制数据(VARBINARY)反序列化为 RoaringBitmap 对象,在内存中进行按位或(Bitwise OR)操作,再序列化写回。非常适合海量 ID 级去重和集合求并集场景。

二、 读取时是如何对多条相同主键的增量记录进行指标聚合的?

在 Apache Paimon 中,主键表在后台是以 LSM-Tree(日志结构合并树) 的形式组织文件并进行持久化存储的。在读取时(通常是批量查询或 Merge On Read 模式的流查询),Paimon 通过以下步骤对相同主键的增量数据执行指标聚合:

1. 触发读时合并(Merge On Read - MOR)

由于 LSM-Tree 的追加写特性,当多批增量数据写入同一个 Bucket 时,它们会被先缓存在内存中(MemTable),随后溢写为按主键有序的段文件(SSTables / Sorted Runs)。此时,对于同一个主键,在不同的段文件或不同的 LSM 层级中会同时存在多条未合并的“历史快照/增量变化记录”。
在查询(Batch/Streaming)时,Paimon 的 Reader 会执行 MOR(Merge On Read) 逻辑,加载这些包含 overlapping(重叠)主键范围的所有数据文件。

2. 多路归并排序与 Sequence 定序

为了确保聚合逻辑的顺序性(比如判断何为 last_value),Reader 在读取文件时,利用 Heap(堆)对多个有序文件进行多路归并排序(Multi-way Merge Sort)

  • 定序依据:在归并期间,如果遇到主键相同的记录,Reader 会严格根据这些记录的 Sequence Number(通常由写入时的 Flink 算子自增序列或用户指定的 sequence.field 字段来决定)对它们进行从旧到新的排序。

3. 逐个字段、由旧到新地执行 Aggregation

在对相同主键的增量数据排好序后,Paimon 的 Aggregation 引擎的逻辑生成器(Merger)开始在内存中流式处理该主键的这一组增量链:

  1. 初始化:读取第一条记录(最旧的一条),作为初始聚合状态(State)。
  2. 字段级逐一聚合:接着读取下一条增量记录(按 Sequence 排序),Reader 会遍历该记录的非主键列:
    • 如果某一列配置了 'sum',就将当前记录该列的值累加到 State 对应的列上。
    • 如果某一列配置了 'max',则通过比较,在 State 中保留两者的最大值。
    • 如果没有配置聚合函数,则默认对该列执行 last_non_null_value(即:若新纪录该列不为空,则用新值覆盖 State 中的值;若新纪录为空,则保留 State 里的旧值)。
  3. 输出结果:当把该主键的所有增量记录全部流式合并/聚合完毕后,整个归并器最终会只输出一条统一的、代表完全聚合后状态的 Record 给下游计算引擎(Flink / Spark / StarRocks 等)。

4. 特殊消息处理:撤回(Retraction)支持

在实时流处理中,可能会收到来自上游的撤回消息(如 Flink 的 UPDATE_BEFOREDELETE)。Paimon 的 Aggregation 引擎在读时合并时,也对撤回消息进行了适配:

  • 支持撤回的函数sumproductcountcollectmerge_mapnested_updatelast_valuelast_non_null_value 支持撤回逻辑(如 sum 遇到撤回会自动递减)。
  • 不支持撤回的函数:其余函数(如 max / min / rbm32 等)在读时默认不支持直接撤回(如果一定要忽略撤回错误,可配置 'fields.${field_name}.ignore-retract' = 'true')。

5. 读时聚合的物理性能保障

由于每一次读取都要在内存中做高强度的多路归并和函数累加,这在数据量极大时会影响查询性能。Paimon 结合了两种机制来缓解此问题:

  • 后台异步 Compaction(合并压缩):Paimon 会在后台启动异步 Compaction 任务,将这些磁盘上的多路小文件读取出来,在后台执行一轮完全相同的 Aggregation 合并,最后写回成单路、完全聚合好的高层大文件。这样可以大大减少下一次查询时需要在线(On-the-fly)聚合的数据条数。
  • 写入端局部预聚合(Local Merging):如果遇到主键热点严重倾斜,用户可以配置 local-merge-buffer-size。它能在数据还没有写入 Bucket(进行 Shuffle 排序)前,先在 Flink Writer 内存中进行一轮预聚合,大幅降低最终落地到 LSM 文件中的重复主键行数,从而极大提升后续读端合并的效率。
00:00
00:00