上游 MySQL 中经常有物理删除(DELETE)操作。下游实时流消费 Paimon 时,部分消费者不希望接收到物理删除信号,而另一部分则必须同步删除。在 Paimon 中,如何通过表属性或 SQL Hint 配置,优雅地在写入端和读取端过滤或传播删除消息?
在 Apache Paimon 的实时流式处理中,处理上游(如 MySQL CDC)的物理删除(DELETE)信号是一个非常典型的场景。
由于下游不同消费端对删除信号的需求存在冲突(一部分需要同步删除,另一部分希望保留历史数据、忽略删除),最优雅、最灵活的架构设计通常是“写入端保留全量(包含 DELETE 信号),读取端按需过滤/传播”。这样既能保证 Paimon 作为单一可信数据源(Single Source of Truth)的完整性,又赋予了不同消费者自主选择的权利。
以下是具体的配置方法、SQL Hint 示例以及底层原理。
一、 最佳实践:读取端(Source)按需分流过滤
在这种模式下,写入 Paimon 时不进行任何过滤,保留物理删除信号。当下游消费者通过 Flink 流式消费该 Paimon 表时,通过 SQL Hint (OPTIONS) 动态决定是否过滤删除信号。
1. 消费者 A:需要同步删除(默认行为)
对于需要同步删除的消费者,无需做额外配置,直接流式读取即可。Paimon 默认会向下游传播 -D (DELETE) 信号。
-- Flink SQL 运行在 streaming 模式下
SET 'execution.runtime-mode' = 'streaming';
-- 直接消费,默认会接收到 INSERT (+I)、UPDATE_AFTER (+U) 和 DELETE (-D) 信号
SELECT * FROM paimon_catalog.my_db.my_table;
2. 消费者 B:需要过滤/忽略删除信号
对于不希望受到物理删除影响的消费者,可以使用 SQL Hint 动态将 ignore-delete 设置为 true。
-- 使用 OPTIONS Hint 动态忽略删除信号
SELECT * FROM paimon_catalog.my_db.my_table
/*+ OPTIONS('ignore-delete' = 'true') */;
- 底层机制:当在读取端设置
ignore-delete' = 'true'时,Paimon 的 Scan 算子(读端)在扫描 Changelog 或数据文件时,会直接在内存中拦截并过滤掉所有-D(DELETE) 和-U(UPDATE_BEFORE) 类型的消息,只向下游发送+I(INSERT) 和+U(UPDATE_AFTER) 消息。
二、 写入端(Sink)过滤与合并配置
在某些特殊场景中(例如:为了节省存储空间、构建一个绝对不含删除的历史备份表,或者使用了不支持删除的 Merge Engine),您可能希望在写入 Paimon 时就直接将删除信号过滤掉。
这可以通过在 建表 DDL (Table Properties) 中或 写入 SQL Hint (OPTIONS) 中进行配置。
1. Deduplicate 引擎(默认主键表引擎)下的写入过滤
默认情况下,Deduplicate 引擎收到 -D 消息后会删除对应主键的整行数据。如果希望写入端直接忽略删除操作:
- 方法 A:在建表 DDL 中永久配置sql
CREATE TABLE paimon_catalog.my_db.my_table ( id INT PRIMARY KEY NOT ENFORCED, name STRING, price DOUBLE ) WITH ( 'merge-engine' = 'deduplicate', 'ignore-delete' = 'true' -- 写入时直接忽略物理删除 ); - 方法 B:在 INSERT 写入时通过 SQL Hint 动态配置sql
INSERT INTO paimon_catalog.my_db.my_table /*+ OPTIONS('ignore-delete' = 'true') */ -- 动态 Hint 过滤 SELECT id, name, price FROM mysql_cdc_source;
2. Partial Update(部分更新)引擎下的写入过滤
在多流拼接场景中,partial-update 引擎默认是不支持接受 -D (DELETE) 信号的,如果直接输入删除信号会抛出异常或导致状态混乱。对此,Paimon 提供了以下三种优雅的写入端应对方案(通常三选一):
- 方案 A:忽略删除信号(保持原样)
直接配置ignore-delete。上游物理删除时,Paimon 中的整行数据不受任何影响。sqlCREATE TABLE partial_update_table ( ... ) WITH ( 'merge-engine' = 'partial-update', 'ignore-delete' = 'true' ); - 方案 B:物理删除时级联删除整行
如果希望当核心主表收到-D消息时,将 Paimon 里合并好的整行数据也物理删除,可以配置partial-update.remove-record-on-delete(部分版本中简写为remove-record-on-delete)。sqlCREATE TABLE partial_update_table ( ... ) WITH ( 'merge-engine' = 'partial-update', 'partial-update.remove-record-on-delete' = 'true' ); - 方案 C:Sequence Group(序列组)内的局部撤回
配置序列组,只撤回/清空该数据流对应的特定列,而保留其他流写入的列。
三、 常用核心属性参考表
为了更好地在实际生产中组合使用,下表汇总了与删除信号处理相关的核心表属性:
| 配置项 | 默认值 | 适用阶段 | 说明说明 |
|---|---|---|---|
ignore-delete |
false |
写入端 / 读取端 | 是否忽略 -D(删除)消息。若用于读端,则流式读取时过滤删除信号。 |
ignore-update-before |
false |
读取端 | 是否过滤 -U (UPDATE_BEFORE) 消息。常用于配合 ignore-delete 将 Changelog 转换成 Append-only 纯增量流下发。 |
partial-update.remove-record-on-delete |
false |
写入端 | 在 Partial Update 引擎下,收到 -D 消息时是否删除整行。 |
aggregation.remove-record-on-delete |
false |
写入端 | 在 Aggregation 引擎下,收到 -D 消息时是否将整行数据清空。 |