Flink自定义 UDF 流程
在 Apache Flink 中,自定义 UDF(User-Defined Function,用户自定义函数)是扩展 Flink SQL 和 Table API 功能的重要手段。Flink 支持多种类型的 UDF,以满足不同的数据处理需求。
以下是开发、注册和使用 Flink 自定义 UDF 的完整流程。
一、Flink UDF 的分类
在开始之前,先了解 Flink 支持的四种主要 UDF 类型:
- ScalarFunction(标量函数,UDF):一进一出。输入一行数据的零个或多个值,输出一个值(如:
substring)。 - TableFunction(表值函数,UDTF):一进多出。输入一行数据,输出任意多行数据(甚至 0 行)(如:
split拆分字符串)。 - AggregateFunction(聚合函数,UDAF):多进一出。将多行数据聚合成一个值(如:
sum,avg)。 - TableAggregateFunction(表聚合函数,UDTAF):多进多出。将多行数据聚合成多行(如:计算 Top3)。
二、开发流程(以 Java 为例)
步骤 1:添加 Maven 依赖
在你的 pom.xml 中引入 Flink Table API 相关依赖(版本需与你的 Flink 集群一致,这里以 1.17.0 为例):
xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.17.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.17.0</version>
<scope>provided</scope>
</dependency>
步骤 2:编写 UDF 代码
示例 A:ScalarFunction(标量函数)
最常用。必须实现 eval 方法(注意:eval 方法必须是 public 且不能是 static,Flink 会通过反射调用)。
java
import org.apache.flink.table.functions.ScalarFunction;
// 示例:将字符串转换为大写,并拼接一个后缀
public class SubstringAndUpperFunction extends ScalarFunction {
// 必须命名为 eval
public String eval(String s, Integer beginIndex) {
if (s == null) {
return null;
}
return s.substring(beginIndex).toUpperCase();
}
}
示例 B:TableFunction(表值函数/UDTF)
需要使用 collect 方法输出多行数据。可以使用类型注解 @FunctionHint 来声明输出的字段名和类型。
java
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
// 声明输出的行格式
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public class SplitFunction extends TableFunction<Row> {
// 输入一个字符串,按空格切分,输出单词和长度
public void eval(String str) {
if (str == null) {
return;
}
for (String s : str.split(" ")) {
// 使用 collect 收集输出结果
collect(Row.of(s, s.length()));
}
}
}
示例 C:AggregateFunction(聚合函数/UDAF)
需要定义一个累加器(Accumulator)来存储中间结果。
java
import org.apache.flink.table.functions.AggregateFunction;
// 累加器结构
public class WeightedAvgAccumulator {
public long sum = 0;
public int count = 0;
}
// 示例:计算加权平均值
public class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccumulator> {
@Override
public Long getValue(WeightedAvgAccumulator accumulator) {
if (accumulator.count == 0) {
return null;
} else {
return accumulator.sum / accumulator.count;
}
}
@Override
public WeightedAvgAccumulator createAccumulator() {
return new WeightedAvgAccumulator();
}
// 必须实现 accumulate 方法
public void accumulate(WeightedAvgAccumulator accumulator, Long iValue, Integer iWeight) {
accumulator.sum += iValue * iWeight;
accumulator.count += iWeight;
}
}
步骤 3:注册 UDF
在 Flink 任务中,你需要将写好的类注册到 TableEnvironment 中,并指定一个函数名。
在 Java/Scala 代码中注册:
java
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
// 1. 注册标量函数 (UDF)
tEnv.createTemporarySystemFunction("my_upper", SubstringAndUpperFunction.class);
// 2. 注册表值函数 (UDTF)
tEnv.createTemporarySystemFunction("my_split", SplitFunction.class);
// 3. 注册聚合函数 (UDAF)
tEnv.createTemporarySystemFunction("weighted_avg", WeightedAvg.class);
在 Flink SQL CLI / 外部 SQL 脚本中注册:
如果你已经把 UDF 打包成 Jar 包,可以使用 SQL 语句注册:
sql
-- 1. 添加 JAR 包
ADD JAR 'hdfs:///path/to/your/flink-udf.jar'; -- 或者本地路径 file:///...
-- 2. 创建函数
CREATE TEMPORARY SYSTEM FUNCTION my_upper AS 'com.yourcompany.udf.SubstringAndUpperFunction';
CREATE TEMPORARY SYSTEM FUNCTION my_split AS 'com.yourcompany.udf.SplitFunction';
步骤 4:在 SQL 中使用 UDF
注册成功后,就可以像使用 Flink 内置函数一样使用它们了。
sql
-- 使用 ScalarFunction (my_upper)
SELECT my_upper(name, 2) FROM UserTable;
-- 使用 TableFunction (my_split) -> 需要用 LEFT JOIN LATERAL ... ON TRUE
SELECT
t.id,
f.word,
f.length
FROM
MyTable t,
LATERAL TABLE(my_split(t.sentence)) AS f(word, length);
-- 使用 AggregateFunction (weighted_avg)
SELECT
categoryId,
weighted_avg(price, weight)
FROM
GoodsTable
GROUP BY
categoryId;
三、生产实践中的注意事项
- 打包(Shade):打包 UDF 项目时,务必将 Flink 依赖(
flink-table-api-*等)设置为<scope>provided</scope>,避免把 Flink 的核心包打入 UDF JAR 中,导致类冲突。 - 类型推导(Type Extraction):Flink 1.11 之后引入了全新的类型系统。如果你的 UDF 涉及复杂的泛型、自定义 POJO,建议使用
@DataTypeHint或@FunctionHint注解显式声明输入输出类型,否则 Flink 可能会报ValidationException。 - 富函数(RichFunction)特性:Flink 的 UDF 类可以重写
open()和close()方法。如果你需要在 UDF 中连接外部数据库(如 Redis、HBase 进行数据关联),可以在open()方法中初始化连接池,在close()中关闭。