Java Stream filter 操作的底层原理是什么?
Java Stream 的 filter 操作是其函数式编程核心特性的重要组成部分。要深入理解其底层原理,我们需要从 数据结构、惰性求值(Lazy Evaluation)、双向链表构建、以及核心的 Sink 机制 这几个维度来剖析。
一句话总结:filter 本质上并不立即过滤数据,而是构建了一个“操作管道(Pipeline)”。当终结操作触发时,数据通过一个单向流动的“水管(Sink)”,在 filter 对应的节点进行条件判定,决定是否将数据传递给下一个节点。
下面是详细的底层原理剖析:
1. 核心架构:双向链表与管道(Pipeline)
Stream 的底层是用一个双向链表来表示整个流式操作的。
每一个 Stream 操作(如 filter、map 等)都会产生一个新的 Stream 阶段(Stage)对象,这些对象连接在一起构成了一个管道。
在 JDK 中,主要由 AbstractPipeline 及其子类来实现:
- Head:代表数据源(如
Collection.stream()产生的Head节点)。 - StatelessOp:无状态操作(如
filter、map),继承自ReferencePipeline。 - StatefulOp:有状态操作(如
sorted、distinct),需要获取到所有数据才能进行下一步。
当你写下:
list.stream() // 1. Head 节点
.filter(x -> x > 2) // 2. StatelessOp 节点 (Filter)
.map(x -> x * 2) // 3. StatelessOp 节点 (Map)
.collect(Collectors.toList()); // 4. Terminal Op (终结操作)
在内存中,会构建出一个链表结构:Head <---> FilterOp <---> MapOp
2. 惰性求值(Lazy Evaluation)的实现
当你调用 .filter(x -> x > 2) 时,没有任何数据被遍历或过滤。
filter 的源码实现非常简单,它只是创建并返回了一个新的 StatelessOp 对象:
// ReferencePipeline.java 中的 filter 实现
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
// 返回一个无状态的流操作阶段
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlags.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
// 这是核心:包装下一个操作的 Sink
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
// filter 之后,元素个数可能变少,所以 size 变为 -1 (未知)
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
// 条件判定:如果符合 predicate 条件,才传递给下一个 Sink
if (predicate.test(u)) {
downstream.accept(u);
}
}
};
}
};
}
3. 核心机制:Sink 接口(水管模型)
Sink(水槽/接收器)是 Stream 执行引擎的核心。每个 Stream 阶段都会将其操作逻辑封装在一个 Sink 对象中。
Sink 接口有四个核心方法:
begin(long size):通知 Sink 开始消费数据,告知数据大小(若无法确定则为 -1)。accept(T t):最核心的方法。消费一个数据。end():通知 Sink 数据消费完毕。cancellationRequested():是否可以提前结束(用于findFirst、limit等短路操作)。
filter 的 Sink 逻辑
正如上面源码所示,filter 产生的 Sink 在 accept 方法中实现了过滤逻辑:
@Override
public void accept(P_OUT u) {
if (predicate.test(u)) { // 1. 执行 predicate 判定
downstream.accept(u); // 2. 只有通过了,才调用下游(downstream)的 accept
}
}
如果不满足条件,downstream.accept(u) 就不会被调用,这个元素就被“过滤”掉了。
4. 终结操作触发的“逆向包装”与“正向执行”
只有当终结操作(如 collect、forEach)被调用时,整个流才会开始执行。执行过程分为两个阶段:
阶段一:逆向包装(Wrap Sinks)
从最后一个终结操作开始,逆向调用每个阶段的 opWrapSink 方法,将 Sink 相互包装起来。
假设我们的链条是:Source -> Filter -> Map -> Collector
- 首先创建
Collector的Sink(最下游)。 Map节点把Collector Sink包装成Map Sink。Filter节点把Map Sink包装成Filter Sink。
最终,我们得到了一个嵌套的 Sink 链表:FilterSink -> 内部持有 MapSink -> 内部持有 CollectorSink。
阶段二:正向执行(Push Data)
一旦 Sink 链条构建完成,Stream 就会获取数据源的 Spliterator(迭代器),开始正向推送数据:
- 调用
FilterSink.begin()-> 触发MapSink.begin()-> 触发CollectorSink.begin()。 Spliterator遍历数据,对每个元素调用FilterSink.accept(item)。- 数据流向:
- 元素
item传给FilterSink.accept。 Filter进行断言:- 如果不满足,直接丢弃(不调用下游)。
- 如果满足,调用
MapSink.accept。
MapSink进行转换,然后调用CollectorSink.accept。CollectorSink收集数据。
- 元素
- 调用各级的
end()方法,结束流程。
5. 性能优化:循环合并(Loop Fusion)与无状态
Stream 的这种底层设计带来了一个极大的性能优势:循环合并(Loop Fusion)。
在传统的命令式代码中,如果你写了两个循环:
// 产生了中间集合,遍历了两次
List<Integer> temp = new ArrayList<>();
for(int x : list) { if(x > 2) temp.add(x); }
for(int x : temp) { System.out.println(x * 2); }
而在 Stream 中,因为有 Sink 链的存在,每个元素在一次遍历中就会完整走完 Filter -> Map -> Consume 的整套流程。
数据源 ──(item)──> [FilterSink] ──(通过)──> [MapSink] ──> [CollectorSink]
这相当于:
for(int x : list) {
if(x > 2) { // Filter
int mapped = x * 2; // Map
collector.add(mapped); // Collect
}
}
没有产生任何中间集合,只有一次循环。
总结
Java Stream filter 的底层原理可以概括为:
- 声明阶段:利用双向链表结构记录操作,不触发实际计算(惰性)。
- 准备阶段:终结操作触发时,从后往前,将每个操作的
Sink包装成一个嵌套的Sink链。 - 执行阶段:迭代数据源,数据在
Sink链中“正向流动”。filter对应的Sink拦截不满足条件的数据,只放行满足条件的数据到下游,从而实现高效的单次遍历过滤。