基于本文回答
0
评论

Flink的Keyed State(键控状态)和 Operator State(算子状态)

知识点图片

Apache Flink 中的状态(State)是其流处理能力的核心,用于记忆过去的数据以影响未来的处理结果。Flink 将状态主要分为两大类:Keyed State(键控状态)Operator State(算子状态)

这两者的核心区别在于状态的作用域(Scope)以及在并行度发生变化(扩缩容)时状态如何重新分配

以下是详细的对比和解析:


1. Keyed State (键控状态)

这是 Flink 中最常用的状态类型。

  • 适用场景:只能用在 KeyedStream 上,即在调用 keyBy() 算子之后。
  • 作用域
    • 状态是根据 Key 来隔离的。
    • 对于每一个 Key,都有一个独立的状态实例。
    • 例如:如果你在统计每个用户的点击量,Key 是 userId,那么 user_A 的计数器和 user_B 的计数器是完全隔离的。
  • 数据结构:Flink 提供了丰富的接口来操作 Keyed State:
    • ValueState<T>: 保存单个值(如计数器)。
    • ListState<T>: 保存一个列表。
    • MapState<K, V>: 保存键值对映射。
    • ReducingState<T>: 保存聚合后的结果(通过 reduce 函数更新)。
    • AggregatingState<IN, OUT>: 保存聚合结果(比 Reducing 更通用)。
  • 扩缩容(Rescaling)机制
    • Flink 使用 Key Groups(键组) 机制。
    • 所有的 Key 被哈希分配到固定的 Key Groups 中(默认最大并行度通常为 128 或更大)。
    • 当并行度改变时,Flink 会将 Key Groups 重新分配给新的 Task。状态随着 Key Group 走,因此特定的 Key 总是能访问到它之前的状态。

2. Operator State (算子状态)

Operator State 与 Key 无关,它绑定到算子的并行子任务(Subtask)

  • 适用场景:可以用于任何算子(Source, Sink, Map 等),不需要 keyBy()。常用于 Source 连接器(保存 Kafka Offset)或 Sink 连接器(缓冲写入数据)。
  • 作用域
    • 状态绑定到当前的并行子任务(Task 实例)
    • 如果一个算子的并行度是 4,那么就有 4 个独立的 Operator State 实例。
    • 子任务内部的所有数据共享这一个状态。
  • 数据结构
    • ListState<T>: 列表状态。
    • BroadcastState<K, V>: 广播状态(一种特殊的 Operator State,用于将配置广播到所有 Task)。
  • 扩缩容(Rescaling)机制
    • 当并行度发生变化时,Operator State 需要定义“如何分配”状态数据。主要有三种模式:
      1. ListState (Even-split / Round-robin): 将所有并发的状态合并成一个大列表,然后均匀分配给新的并发任务(最常用)。
      2. UnionListState: 将所有并发的状态合并,然后把完整的列表广播给每一个新的并发任务(每个任务都收到全量数据)。
      3. BroadcastState: 总是复制给所有下游任务。

3. 核心对比总结表

特性 Keyed State (键控状态) Operator State (算子状态)
前提条件 必须在 keyBy() 之后使用 可以在任何算子中使用
绑定对象 绑定到 Key 绑定到 Operator Subtask (并行子任务)
访问方式 通过 RuntimeContext 访问 (getState, getMapState 等) 通常通过实现 CheckpointedFunction 接口访问
数据结构 Value, List, Map, Reducing, Aggregating ListState, BroadcastState
扩缩容行为 基于 Key Group 自动重新分片,Key 的状态跟随 Key 走 逻辑重分配 (均匀拆分、全量广播等)
典型用例 聚合窗口、去重、基于 Key 的模式匹配 Kafka Source (Offset)、文件 Sink、广播配置
状态大小 通常较大(Key 的数量可能是一亿级) 通常较小(仅与并行度相关)

4. 代码层面的区别

Keyed State 示例

通常在 RichFunction 中使用:

java
public class CountWindow extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

    // 1. 定义状态句柄
    private transient ValueState<Integer> countState;

    @Override
    public void open(Configuration parameters) {
        // 2. 初始化状态描述符
        ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>(
                "count-state", // 状态名称
                Integer.class); // 状态类型
        countState = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
        // 3. 访问状态(自动根据当前处理数据的 Key 获取对应值)
        Integer currentCount = countState.value();
        if (currentCount == null) {
            currentCount = 0;
        }
        currentCount += 1;
        
        // 4. 更新状态
        countState.update(currentCount);
        
        out.collect(Tuple2.of(value.f0, currentCount));
    }
}

Operator State 示例

通常实现 CheckpointedFunction 接口:

java
public class BufferingSink implements SinkFunction<String>, CheckpointedFunction {

    private final int threshold;
    private List<String> bufferedElements; // 内存中的临时缓冲
    
    // 1. 定义状态句柄
    private transient ListState<String> checkpointedState;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(String value, Context context) {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            // 发送到外部系统...
            bufferedElements.clear();
        }
    }

    // 2. 制作快照时调用 (Snapshot)
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        // 将当前内存数据放入状态中保存
        for (String element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    // 3. 恢复状态时调用 (Restore)
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<String> descriptor = new ListStateDescriptor<>(
                "buffered-elements",
                String.class);
        
        // 获取 Operator State
        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        // 如果是故障恢复,从状态中加载数据回内存
        if (context.isRestored()) {
            for (String element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

总结

  • 如果你需要处理每个用户、每个设备、每个订单的数据,请使用 Keyed State
  • 如果你需要处理Kafka Offset、全局配置、或者与特定 Key 无关的缓冲数据,请使用 Operator State
右滑查看面试常问