Aggregation(聚合)合并引擎支持哪些聚合函数?它在读取时是如何对多条相同主键的增量记录进行指标聚合的?
在 Apache Paimon 的官方文档中,Aggregation(聚合)合并引擎是一种极具弹性的主键表(Primary Key Table)合并机制。它允许用户在主键相同的情况下,不对数据进行简单的覆盖(Deduplicate),而是对各个非主键字段分别应用特定的聚合函数,实现存储层的实时预聚合。
以下是根据 Apache Paimon master 最新官方文档整理的详细解答。
一、 Aggregation 合并引擎支持的聚合函数
当表配置为 'merge-engine' = 'aggregation' 时,主键之外的每一个值字段都可以通过 fields.<field-name>.aggregate-function 属性指定聚合函数;如果不指定,默认使用 last_non_null_value。
Paimon master 文档中列出的支持聚合函数如下:
1. 基础数值与比较聚合
sum:累加求和。- 支持类型:
DECIMAL,TINYINT,SMALLINT,INTEGER,BIGINT,FLOAT,DOUBLE。
- 支持类型:
product:乘积计算。- 支持类型:
DECIMAL,TINYINT,SMALLINT,INTEGER,BIGINT,FLOAT,DOUBLE。
- 支持类型:
count:计数统计(通常可将条件转为 1 或 0 之后通过 SUM 间接统计,也支持直接计数)。max:保留最大值。- 支持类型:
CHAR,VARCHAR,DECIMAL,TINYINT,SMALLINT,INTEGER,BIGINT,FLOAT,DOUBLE,DATE,TIME,TIMESTAMP,TIMESTAMP_LTZ。
- 支持类型:
min:保留最小值。- 支持类型:同
max。
- 支持类型:同
2. 位置与时间序聚合
last_value:总是用最新输入的值覆盖旧值(哪怕最新值是NULL)。- 支持类型:所有数据类型。
last_non_null_value:用最新的非空(Non-null)值覆盖旧值。- 支持类型:所有数据类型。
first_value:保留第一次导入的值,后续输入被忽略。first_non_null_value:保留第一次导入的非空值。
3. 文本与逻辑聚合
listagg:将多行字符串拼接成一个字符串。- 支持类型:
STRING。 - 相关配置:可通过
fields.<field-name>.list-agg-delimiter自定义分隔符(默认是,),以及通过fields.<field-name>.distinct=true进行去重。
- 支持类型:
bool_and:对布尔值字段进行逻辑与(AND)操作。- 支持类型:
BOOLEAN。
- 支持类型:
bool_or:对布尔值字段进行逻辑或(OR)操作。- 支持类型:
BOOLEAN。
- 支持类型:
4. 高级与复杂数据结构聚合
collect:将多行数据收集并构造成一个 Array(数组)。- 支持类型:
ARRAY。 - 相关配置:支持设置
fields.<field-name>.distinct=true自动进行数组元素去重。
- 支持类型:
merge_map:合并多个 Map 键值对。- 支持类型:
MAP。
- 支持类型:
merge_map_with_keytime:基于时间戳对 Map 元素进行Key 级别的局部更新。- 支持类型:
MAP<key_type, ROW<value_field, ts_field>>。 - 工作逻辑:每个 Key 均带有时间戳,合并时仅保留该 Key 下时间戳最新的 Value;如果最新 Value 为
NULL,则该 Key 会被从 Map 中移除。
- 支持类型:
nested_update:将多行记录收集为一个嵌套的 Array 行(类似于嵌套表,Nested Table)。- 支持类型:
ARRAY。 - 工作逻辑:必须通过
fields.<field-name>.nested-key指定嵌套主键,并基于该主键进行去重和字段更新。
- 支持类型:
nested_partial_update:功能类似于nested_update,但它支持在行内进行部分列更新(Partial Update)。
5. 基数估算(Cardinality Sketches)
Paimon 整合了 Apache DataSketches 库,支持在存储层直接进行去重基数估算:
hll_sketch:使用 HyperLogLog 算法对多条序列化的 Sketch 对象进行合并聚合(适用于高精度的近似去重,节省空间)。- 支持类型:
VARBINARY。
- 支持类型:
theta_sketch:使用 Theta Sketch 算法合并 Sketch,除了去重外,还支持集合的交、并、差集计算。- 支持类型:
VARBINARY。
- 支持类型:
6. 位图聚合(Bitmap)
rbm32&rbm64:合并已序列化的 32 位或 64 位 RoaringBitmap。- 工作原理:将输入的二进制数据(
VARBINARY)反序列化为 RoaringBitmap 对象,在内存中进行按位或(Bitwise OR)操作,再序列化写回。非常适合海量 ID 级去重和集合求并集场景。
- 工作原理:将输入的二进制数据(
二、 读取时是如何对多条相同主键的增量记录进行指标聚合的?
在 Apache Paimon 中,主键表在后台是以 LSM-Tree(日志结构合并树) 的形式组织文件并进行持久化存储的。在读取时(通常是批量查询或 Merge On Read 模式的流查询),Paimon 通过以下步骤对相同主键的增量数据执行指标聚合:
1. 触发读时合并(Merge On Read - MOR)
由于 LSM-Tree 的追加写特性,当多批增量数据写入同一个 Bucket 时,它们会被先缓存在内存中(MemTable),随后溢写为按主键有序的段文件(SSTables / Sorted Runs)。此时,对于同一个主键,在不同的段文件或不同的 LSM 层级中会同时存在多条未合并的“历史快照/增量变化记录”。
在查询(Batch/Streaming)时,Paimon 的 Reader 会执行 MOR(Merge On Read) 逻辑,加载这些包含 overlapping(重叠)主键范围的所有数据文件。
2. 多路归并排序与 Sequence 定序
为了确保聚合逻辑的顺序性(比如判断何为 last_value),Reader 在读取文件时,利用 Heap(堆)对多个有序文件进行多路归并排序(Multi-way Merge Sort)。
- 定序依据:在归并期间,如果遇到主键相同的记录,Reader 会严格根据这些记录的 Sequence Number(通常由写入时的 Flink 算子自增序列或用户指定的
sequence.field字段来决定)对它们进行从旧到新的排序。
3. 逐个字段、由旧到新地执行 Aggregation
在对相同主键的增量数据排好序后,Paimon 的 Aggregation 引擎的逻辑生成器(Merger)开始在内存中流式处理该主键的这一组增量链:
- 初始化:读取第一条记录(最旧的一条),作为初始聚合状态(State)。
- 字段级逐一聚合:接着读取下一条增量记录(按 Sequence 排序),Reader 会遍历该记录的非主键列:
- 如果某一列配置了
'sum',就将当前记录该列的值累加到 State 对应的列上。 - 如果某一列配置了
'max',则通过比较,在 State 中保留两者的最大值。 - 如果没有配置聚合函数,则默认对该列执行
last_non_null_value(即:若新纪录该列不为空,则用新值覆盖 State 中的值;若新纪录为空,则保留 State 里的旧值)。
- 如果某一列配置了
- 输出结果:当把该主键的所有增量记录全部流式合并/聚合完毕后,整个归并器最终会只输出一条统一的、代表完全聚合后状态的 Record 给下游计算引擎(Flink / Spark / StarRocks 等)。
4. 特殊消息处理:撤回(Retraction)支持
在实时流处理中,可能会收到来自上游的撤回消息(如 Flink 的 UPDATE_BEFORE 或 DELETE)。Paimon 的 Aggregation 引擎在读时合并时,也对撤回消息进行了适配:
- 支持撤回的函数:
sum、product、count、collect、merge_map、nested_update、last_value和last_non_null_value支持撤回逻辑(如sum遇到撤回会自动递减)。 - 不支持撤回的函数:其余函数(如
max/min/rbm32等)在读时默认不支持直接撤回(如果一定要忽略撤回错误,可配置'fields.${field_name}.ignore-retract' = 'true')。
5. 读时聚合的物理性能保障
由于每一次读取都要在内存中做高强度的多路归并和函数累加,这在数据量极大时会影响查询性能。Paimon 结合了两种机制来缓解此问题:
- 后台异步 Compaction(合并压缩):Paimon 会在后台启动异步 Compaction 任务,将这些磁盘上的多路小文件读取出来,在后台执行一轮完全相同的
Aggregation合并,最后写回成单路、完全聚合好的高层大文件。这样可以大大减少下一次查询时需要在线(On-the-fly)聚合的数据条数。 - 写入端局部预聚合(Local Merging):如果遇到主键热点严重倾斜,用户可以配置
local-merge-buffer-size。它能在数据还没有写入 Bucket(进行 Shuffle 排序)前,先在 Flink Writer 内存中进行一轮预聚合,大幅降低最终落地到 LSM 文件中的重复主键行数,从而极大提升后续读端合并的效率。