基于本文回答
0
评论

Flink自定义 UDF 流程

在 Apache Flink 中,自定义 UDF(User-Defined Function,用户自定义函数)是扩展 Flink SQL 和 Table API 功能的重要手段。Flink 支持多种类型的 UDF,以满足不同的数据处理需求。

以下是开发、注册和使用 Flink 自定义 UDF 的完整流程。


一、Flink UDF 的分类

在开始之前,先了解 Flink 支持的四种主要 UDF 类型:

  1. ScalarFunction(标量函数,UDF):一进一出。输入一行数据的零个或多个值,输出一个值(如:substring)。
  2. TableFunction(表值函数,UDTF):一进多出。输入一行数据,输出任意多行数据(甚至 0 行)(如:split 拆分字符串)。
  3. AggregateFunction(聚合函数,UDAF):多进一出。将多行数据聚合成一个值(如:sum, avg)。
  4. TableAggregateFunction(表聚合函数,UDTAF):多进多出。将多行数据聚合成多行(如:计算 Top3)。

二、开发流程(以 Java 为例)

步骤 1:添加 Maven 依赖

在你的 pom.xml 中引入 Flink Table API 相关依赖(版本需与你的 Flink 集群一致,这里以 1.17.0 为例):

xml
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>1.17.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.17.0</version>
    <scope>provided</scope>
</dependency>

步骤 2:编写 UDF 代码

示例 A:ScalarFunction(标量函数)

最常用。必须实现 eval 方法(注意:eval 方法必须是 public 且不能是 static,Flink 会通过反射调用)。

java
import org.apache.flink.table.functions.ScalarFunction;

// 示例:将字符串转换为大写,并拼接一个后缀
public class SubstringAndUpperFunction extends ScalarFunction {
    
    // 必须命名为 eval
    public String eval(String s, Integer beginIndex) {
        if (s == null) {
            return null;
        }
        return s.substring(beginIndex).toUpperCase();
    }
}
示例 B:TableFunction(表值函数/UDTF)

需要使用 collect 方法输出多行数据。可以使用类型注解 @FunctionHint 来声明输出的字段名和类型。

java
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

// 声明输出的行格式
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public class SplitFunction extends TableFunction<Row> {

    // 输入一个字符串,按空格切分,输出单词和长度
    public void eval(String str) {
        if (str == null) {
            return;
        }
        for (String s : str.split(" ")) {
            // 使用 collect 收集输出结果
            collect(Row.of(s, s.length()));
        }
    }
}
示例 C:AggregateFunction(聚合函数/UDAF)

需要定义一个累加器(Accumulator)来存储中间结果。

java
import org.apache.flink.table.functions.AggregateFunction;

// 累加器结构
public class WeightedAvgAccumulator {
    public long sum = 0;
    public int count = 0;
}

// 示例:计算加权平均值
public class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccumulator> {

    @Override
    public Long getValue(WeightedAvgAccumulator accumulator) {
        if (accumulator.count == 0) {
            return null;
        } else {
            return accumulator.sum / accumulator.count;
        }
    }

    @Override
    public WeightedAvgAccumulator createAccumulator() {
        return new WeightedAvgAccumulator();
    }

    // 必须实现 accumulate 方法
    public void accumulate(WeightedAvgAccumulator accumulator, Long iValue, Integer iWeight) {
        accumulator.sum += iValue * iWeight;
        accumulator.count += iWeight;
    }
}

步骤 3:注册 UDF

在 Flink 任务中,你需要将写好的类注册到 TableEnvironment 中,并指定一个函数名。

在 Java/Scala 代码中注册:
java
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());

// 1. 注册标量函数 (UDF)
tEnv.createTemporarySystemFunction("my_upper", SubstringAndUpperFunction.class);

// 2. 注册表值函数 (UDTF)
tEnv.createTemporarySystemFunction("my_split", SplitFunction.class);

// 3. 注册聚合函数 (UDAF)
tEnv.createTemporarySystemFunction("weighted_avg", WeightedAvg.class);
在 Flink SQL CLI / 外部 SQL 脚本中注册:

如果你已经把 UDF 打包成 Jar 包,可以使用 SQL 语句注册:

sql
-- 1. 添加 JAR 包
ADD JAR 'hdfs:///path/to/your/flink-udf.jar'; -- 或者本地路径 file:///...

-- 2. 创建函数
CREATE TEMPORARY SYSTEM FUNCTION my_upper AS 'com.yourcompany.udf.SubstringAndUpperFunction';
CREATE TEMPORARY SYSTEM FUNCTION my_split AS 'com.yourcompany.udf.SplitFunction';

步骤 4:在 SQL 中使用 UDF

注册成功后,就可以像使用 Flink 内置函数一样使用它们了。

sql
-- 使用 ScalarFunction (my_upper)
SELECT my_upper(name, 2) FROM UserTable;

-- 使用 TableFunction (my_split) -> 需要用 LEFT JOIN LATERAL ... ON TRUE
SELECT 
    t.id, 
    f.word, 
    f.length 
FROM 
    MyTable t,
    LATERAL TABLE(my_split(t.sentence)) AS f(word, length);

-- 使用 AggregateFunction (weighted_avg)
SELECT 
    categoryId, 
    weighted_avg(price, weight) 
FROM 
    GoodsTable 
GROUP BY 
    categoryId;

三、生产实践中的注意事项

  1. 打包(Shade):打包 UDF 项目时,务必将 Flink 依赖(flink-table-api-* 等)设置为 <scope>provided</scope>,避免把 Flink 的核心包打入 UDF JAR 中,导致类冲突。
  2. 类型推导(Type Extraction):Flink 1.11 之后引入了全新的类型系统。如果你的 UDF 涉及复杂的泛型、自定义 POJO,建议使用 @DataTypeHint@FunctionHint 注解显式声明输入输出类型,否则 Flink 可能会报 ValidationException
  3. 富函数(RichFunction)特性:Flink 的 UDF 类可以重写 open()close() 方法。如果你需要在 UDF 中连接外部数据库(如 Redis、HBase 进行数据关联),可以在 open() 方法中初始化连接池,在 close() 中关闭。
右滑查看面试常问