Flink自定义UDF的开发流程
在Apache Flink中,自定义函数(User-Defined Functions, 简称 UDF)是扩展 Flink SQL 和 Table API 能力的重要手段。
Flink 提供了四种主要类型的自定义函数:
- Scalar Function (UDF):标量函数,一对一(输入一行,输出一行)。
- Table Function (UDTF):表值函数,一对多(输入一行,输出多行/多列)。
- Aggregate Function (UDAF):聚合函数,多对一(输入多行,聚合成一行)。
- Table Aggregate Function (UDTAF):表聚合函数,多对多(输入多行,聚合成多行多列)。
下面是完整的 Flink 自定义 UDF 开发流程(以 Java 为例,基于 Flink 1.11+ 之后的现代 API):
第一步:引入 Maven 依赖
在你的 pom.xml 中引入 Flink Table 相关的依赖。注意,作用域通常设置为 provided,因为 Flink 集群已经包含了这些包。
xml
<dependencies>
<!-- Flink Table API (Java) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.17.0</version> <!-- 替换为你的 Flink 版本 -->
<scope>provided</scope>
</dependency>
<!-- 如果需要在本地 IDE 运行测试,可能还需要引入 planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.17.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
第二步:编写 UDF 代码
1. 开发标量函数 (Scalar Function) - 一对一
继承 org.apache.flink.table.functions.ScalarFunction。
注意: 核心方法必须命名为 eval,并且是 public。该方法通过反射调用,所以不需要 @Override。
java
import org.apache.flink.table.functions.ScalarFunction;
// 示例:将字符串转换为大写并添加前缀
public class UpperPrefixUDF extends ScalarFunction {
// 核心逻辑方法,必须叫 eval
public String eval(String str, String prefix) {
if (str == null) {
return null;
}
return prefix + "_" + str.toUpperCase();
}
// 可选:可以重载 eval 方法
public String eval(String str) {
return eval(str, "DEFAULT");
}
}
2. 开发表值函数 (Table Function) - 一对多
继承 org.apache.flink.table.functions.TableFunction<T>,泛型 T 是输出的类型(可以是基础类型、Row 或 POJO)。
java
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
// 示例:按照指定分隔符拆分字符串
// 指定输出的列名和类型
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public class SplitUDTF extends TableFunction<Row> {
public void eval(String str, String separator) {
if (str != null && !str.isEmpty()) {
String[] words = str.split(separator);
for (String word : words) {
// 使用 collect() 方法发送每一行数据
collect(Row.of(word, word.length()));
}
}
}
}
3. 开发聚合函数 (Aggregate Function) - 多对一
继承 org.apache.flink.table.functions.AggregateFunction<T, ACC>。T 是最终返回结果类型,ACC 是中间状态(累加器)类型。
java
import org.apache.flink.table.functions.AggregateFunction;
// 1. 定义累加器 (Accumulator)
public class AvgAccumulator {
public long sum = 0;
public int count = 0;
}
// 2. 定义聚合函数:求平均值 (返回 Double,累加器为 AvgAccumulator)
public class MyAvgUDAF extends AggregateFunction<Double, AvgAccumulator> {
// 初始化累加器
@Override
public AvgAccumulator createAccumulator() {
return new AvgAccumulator();
}
// 获取最终结果
@Override
public Double getValue(AvgAccumulator acc) {
if (acc.count == 0) {
return null;
} else {
return (double) acc.sum / acc.count;
}
}
// 核心处理逻辑:每来一条数据如何更新累加器(必须叫 accumulate)
public void accumulate(AvgAccumulator acc, Long value) {
if (value != null) {
acc.sum += value;
acc.count += 1;
}
}
// 可选:用于 Session Window 或 批处理中的状态合并
public void merge(AvgAccumulator acc, Iterable<AvgAccumulator> it) {
for (AvgAccumulator a : it) {
acc.count += a.count;
acc.sum += a.sum;
}
}
}
第三步:高级特性与生命周期 (可选)
如果你需要在函数执行前初始化资源(如建立数据库连接、加载外部字典),或者返回复杂的类型,可以使用以下特性:
- 生命周期方法:重写
open()和close()方法。
java
@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
// 初始化外部连接,例如 Redis 或 MySQL
}
@Override
public void close() throws Exception {
// 释放资源
super.close();
}
- 类型提示 (Type Hints):Flink 基于反射推断类型,当推断失败(如复杂的
Row或Map)时,需使用@DataTypeHint显式指定。
第四步:注册与使用 UDF
写完 UDF 后,需要在 Flink 程序中注册并使用它。可以通过代码注册,也可以通过 SQL 客户端注册。
方式一:在 DataStream / Table API 代码中注册
java
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class UDFDemo {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// 1. 注册 Scalar Function
tEnv.createTemporarySystemFunction("upper_prefix", UpperPrefixUDF.class);
// 2. 注册 Table Function
tEnv.createTemporarySystemFunction("split_udtf", SplitUDTF.class);
// 3. 注册 Aggregate Function
tEnv.createTemporarySystemFunction("my_avg", MyAvgUDAF.class);
// 使用示例:
// 建表...
tEnv.executeSql("CREATE TABLE source_table (id INT, name STRING, score BIGINT) WITH (...)");
// 使用 UDF
tEnv.executeSql("SELECT upper_prefix(name, 'VIP') FROM source_table").print();
// 使用 UDTF (需要使用 LATERAL TABLE)
tEnv.executeSql("SELECT id, name, word, length FROM source_table, LATERAL TABLE(split_udtf(name, ','))").print();
// 使用 UDAF
tEnv.executeSql("SELECT id, my_avg(score) FROM source_table GROUP BY id").print();
}
}
方式二:在 Flink SQL Client 中使用(推荐用于纯 SQL 作业)
- 将你的 UDF 代码打包成 JAR 文件(例如
my-udfs-1.0.jar)。 - 将 JAR 上传到 Flink 服务器。
- 启动 Flink SQL Client,执行以下命令:
sql
-- 加载 jar 包
ADD JAR '/path/to/my-udfs-1.0.jar';
-- 创建函数 (TEMPORARY 表示临时函数,也可以不加创建永久函数存储在 Hive Metastore)
CREATE TEMPORARY FUNCTION upper_prefix AS 'com.yourcompany.flink.udf.UpperPrefixUDF';
-- 使用函数
SELECT upper_prefix(name, 'VIP') FROM my_table;
总结流程图
- 确定需求 -> 确定使用 UDF(标量)、UDTF(表值) 还是 UDAF(聚合)。
- 引入依赖 ->
flink-table-api-java。 - 继承基类 ->
ScalarFunction/TableFunction/AggregateFunction。 - 实现逻辑 -> 编写
eval()或accumulate()方法。 - 处理类型 -> 如果返回复杂结构,加上
@FunctionHint或@DataTypeHint。 - 打包编译 ->
mvn clean package。 - 注册运行 -> 通过
createTemporarySystemFunction或 SQLCREATE FUNCTION注册并调用。