基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

Flink自定义UDF的开发流程

知识点图片

在Apache Flink中,自定义函数(User-Defined Functions, 简称 UDF)是扩展 Flink SQL 和 Table API 能力的重要手段。

Flink 提供了四种主要类型的自定义函数:

  1. Scalar Function (UDF):标量函数,一对一(输入一行,输出一行)。
  2. Table Function (UDTF):表值函数,一对多(输入一行,输出多行/多列)。
  3. Aggregate Function (UDAF):聚合函数,多对一(输入多行,聚合成一行)。
  4. Table Aggregate Function (UDTAF):表聚合函数,多对多(输入多行,聚合成多行多列)。

下面是完整的 Flink 自定义 UDF 开发流程(以 Java 为例,基于 Flink 1.11+ 之后的现代 API):


第一步:引入 Maven 依赖

在你的 pom.xml 中引入 Flink Table 相关的依赖。注意,作用域通常设置为 provided,因为 Flink 集群已经包含了这些包。

xml
<dependencies>
    <!-- Flink Table API (Java) -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java</artifactId>
        <version>1.17.0</version> <!-- 替换为你的 Flink 版本 -->
        <scope>provided</scope>
    </dependency>
    
    <!-- 如果需要在本地 IDE 运行测试,可能还需要引入 planner -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>1.17.0</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

第二步:编写 UDF 代码

1. 开发标量函数 (Scalar Function) - 一对一

继承 org.apache.flink.table.functions.ScalarFunction
注意: 核心方法必须命名为 eval,并且是 public。该方法通过反射调用,所以不需要 @Override

java
import org.apache.flink.table.functions.ScalarFunction;

// 示例:将字符串转换为大写并添加前缀
public class UpperPrefixUDF extends ScalarFunction {

    // 核心逻辑方法,必须叫 eval
    public String eval(String str, String prefix) {
        if (str == null) {
            return null;
        }
        return prefix + "_" + str.toUpperCase();
    }
    
    // 可选:可以重载 eval 方法
    public String eval(String str) {
        return eval(str, "DEFAULT");
    }
}

2. 开发表值函数 (Table Function) - 一对多

继承 org.apache.flink.table.functions.TableFunction<T>,泛型 T 是输出的类型(可以是基础类型、Row 或 POJO)。

java
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

// 示例:按照指定分隔符拆分字符串
// 指定输出的列名和类型
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public class SplitUDTF extends TableFunction<Row> {

    public void eval(String str, String separator) {
        if (str != null && !str.isEmpty()) {
            String[] words = str.split(separator);
            for (String word : words) {
                // 使用 collect() 方法发送每一行数据
                collect(Row.of(word, word.length()));
            }
        }
    }
}

3. 开发聚合函数 (Aggregate Function) - 多对一

继承 org.apache.flink.table.functions.AggregateFunction<T, ACC>T 是最终返回结果类型,ACC 是中间状态(累加器)类型。

java
import org.apache.flink.table.functions.AggregateFunction;

// 1. 定义累加器 (Accumulator)
public class AvgAccumulator {
    public long sum = 0;
    public int count = 0;
}

// 2. 定义聚合函数:求平均值 (返回 Double,累加器为 AvgAccumulator)
public class MyAvgUDAF extends AggregateFunction<Double, AvgAccumulator> {

    // 初始化累加器
    @Override
    public AvgAccumulator createAccumulator() {
        return new AvgAccumulator();
    }

    // 获取最终结果
    @Override
    public Double getValue(AvgAccumulator acc) {
        if (acc.count == 0) {
            return null;
        } else {
            return (double) acc.sum / acc.count;
        }
    }

    // 核心处理逻辑:每来一条数据如何更新累加器(必须叫 accumulate)
    public void accumulate(AvgAccumulator acc, Long value) {
        if (value != null) {
            acc.sum += value;
            acc.count += 1;
        }
    }

    // 可选:用于 Session Window 或 批处理中的状态合并
    public void merge(AvgAccumulator acc, Iterable<AvgAccumulator> it) {
        for (AvgAccumulator a : it) {
            acc.count += a.count;
            acc.sum += a.sum;
        }
    }
}

第三步:高级特性与生命周期 (可选)

如果你需要在函数执行前初始化资源(如建立数据库连接、加载外部字典),或者返回复杂的类型,可以使用以下特性:

  1. 生命周期方法:重写 open()close() 方法。
java
@Override
public void open(FunctionContext context) throws Exception {
    super.open(context);
    // 初始化外部连接,例如 Redis 或 MySQL
}

@Override
public void close() throws Exception {
    // 释放资源
    super.close();
}
  1. 类型提示 (Type Hints):Flink 基于反射推断类型,当推断失败(如复杂的 RowMap)时,需使用 @DataTypeHint 显式指定。

第四步:注册与使用 UDF

写完 UDF 后,需要在 Flink 程序中注册并使用它。可以通过代码注册,也可以通过 SQL 客户端注册。

方式一:在 DataStream / Table API 代码中注册

java
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class UDFDemo {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 1. 注册 Scalar Function
        tEnv.createTemporarySystemFunction("upper_prefix", UpperPrefixUDF.class);
        
        // 2. 注册 Table Function
        tEnv.createTemporarySystemFunction("split_udtf", SplitUDTF.class);
        
        // 3. 注册 Aggregate Function
        tEnv.createTemporarySystemFunction("my_avg", MyAvgUDAF.class);

        // 使用示例:
        // 建表...
        tEnv.executeSql("CREATE TABLE source_table (id INT, name STRING, score BIGINT) WITH (...)");
        
        // 使用 UDF
        tEnv.executeSql("SELECT upper_prefix(name, 'VIP') FROM source_table").print();
        
        // 使用 UDTF (需要使用 LATERAL TABLE)
        tEnv.executeSql("SELECT id, name, word, length FROM source_table, LATERAL TABLE(split_udtf(name, ','))").print();
        
        // 使用 UDAF
        tEnv.executeSql("SELECT id, my_avg(score) FROM source_table GROUP BY id").print();
    }
}

方式二:在 Flink SQL Client 中使用(推荐用于纯 SQL 作业)

  1. 将你的 UDF 代码打包成 JAR 文件(例如 my-udfs-1.0.jar)。
  2. 将 JAR 上传到 Flink 服务器。
  3. 启动 Flink SQL Client,执行以下命令:
sql
-- 加载 jar 包
ADD JAR '/path/to/my-udfs-1.0.jar';

-- 创建函数 (TEMPORARY 表示临时函数,也可以不加创建永久函数存储在 Hive Metastore)
CREATE TEMPORARY FUNCTION upper_prefix AS 'com.yourcompany.flink.udf.UpperPrefixUDF';

-- 使用函数
SELECT upper_prefix(name, 'VIP') FROM my_table;

总结流程图

  1. 确定需求 -> 确定使用 UDF(标量)、UDTF(表值) 还是 UDAF(聚合)。
  2. 引入依赖 -> flink-table-api-java
  3. 继承基类 -> ScalarFunction / TableFunction / AggregateFunction
  4. 实现逻辑 -> 编写 eval()accumulate() 方法。
  5. 处理类型 -> 如果返回复杂结构,加上 @FunctionHint@DataTypeHint
  6. 打包编译 -> mvn clean package
  7. 注册运行 -> 通过 createTemporarySystemFunction 或 SQL CREATE FUNCTION 注册并调用。
00:00
00:00