CyclicBarrier(循环栅栏)的工作原理是什么?其典型使用场景有哪些?
CyclicBarrier(循环栅栏) 是 Java 并发包 (java.util.concurrent) 中的一个非常重要的同步辅助类。它的字面意思是“可循环使用的栅栏”。
简单来说,它的作用是:让一组线程到达一个同步点(栅栏)时被阻塞,直到最后一个线程到达该同步点,栅栏才会打开,所有被阻塞的线程才能继续往下执行。
一、 CyclicBarrier 的工作原理
CyclicBarrier 的底层是基于 ReentrantLock 和 Condition 实现的。
1. 核心属性
parties(参与者数量):初始化时设定的需要等待的线程总数。count(计数器):当前还需要等待的线程数量。初始值等于parties。barrierCommand(栅栏任务):一个可选的Runnable任务。当所有线程都到达栅栏时(即count == 0时),会优先执行这个任务,然后再唤醒所有等待的线程。Generation(代):内部类,用于表示当前栅栏的“代”或“轮次”。因为 CyclicBarrier 是可循环使用的,每当栅栏被打破(所有线程通过)并重置后,就会生成新的一代。
2. 工作流程(await() 方法执行过程)
当一个线程调用 await() 方法时,内部会发生如下过程:
- 获取锁:线程首先获取
ReentrantLock独占锁。 - 计数器减 1:将
count的值减 1。 - 判断计数器状态:
- 如果
count != 0:说明还有线程没到达,当前线程调用Condition.await()进入阻塞状态,并释放锁。 - 如果
count == 0:说明所有线程都已经到达了同步点。- 如果构造时传入了
barrierCommand,则由当前最后一个到达的线程同步执行该任务。 - 调用
Condition.signalAll()唤醒所有在栅栏处等待的线程。 - 重置状态(Cyclic的体现):将
count恢复为parties,并更新Generation(开启下一轮),然后释放锁。
- 如果构造时传入了
- 如果
- 异常处理(栅栏破损机制):如果在等待过程中,某个线程被中断 (
interrupted) 或超时,当前“代”的栅栏就会被标记为“破损”(Broken)。此时,所有正在等待的线程都会被唤醒,并抛出BrokenBarrierException异常。
二、 典型使用场景
CyclicBarrier 非常适合 “分而治之”然后再“汇总结果” 的场景,或者需要多个线程步调一致地执行多阶段任务的场景。
1. 多线程计算数据,最后合并计算结果
- 场景描述:假设有一个庞大的 Excel 文件,包含 4 个 Sheet 的银行流水。需要计算每个 Sheet 的日均总流水,最后再汇总这 4 个 Sheet 的总和。
- 使用方式:开启 4 个线程分别计算 4 个 Sheet。创建一个
parties = 4的 CyclicBarrier,并传入一个汇总之和的barrierCommand。4 个线程计算完自己的部分后调用await()。当 4 个线程全算完时,自动触发汇总任务,合并最终结果。
2. 模拟并发测试(发令枪)
- 场景描述:在性能测试中,我们需要模拟 100 个用户同时发起请求来测试接口的并发抗压能力。
- 使用方式:创建 100 个线程,每个线程在发起 HTTP 请求前先调用
CyclicBarrier.await()。这样前 99 个线程都会被阻塞等待,直到第 100 个线程到达时,栅栏瞬间打开,100 个请求同时发出,达到瞬时高并发的效果。
3. 游戏加载/多阶段任务同步
- 场景描述:多人联机游戏(如《英雄联盟》或《王者荣耀》),需要等待所有 10 名玩家的进度条都加载到 100% 后,游戏才能同时开始。
- 使用方式:10 个线程代表 10 个玩家的加载过程。加载完成后调用
await()等待。由于 CyclicBarrier 是可循环的,这 10 个线程还可以继续用于下一局游戏的加载同步。
三、 代码示例(合并计算结果场景)
java
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
// 保存每个线程的计算结果
private static ConcurrentHashMap<String, Integer> resultMap = new ConcurrentHashMap<>();
public static void main(String[] args) {
// 创建 CyclicBarrier,parties 为 3,并指定所有线程到达后执行的合并任务
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
int total = 0;
for (int value : resultMap.values()) {
total += value;
}
System.out.println(Thread.currentThread().getName() + " 执行合并任务,最终总结果为: " + total);
});
// 模拟 3 个线程分别处理不同的数据
for (int i = 1; i <= 3; i++) {
final int threadNum = i;
new Thread(() -> {
try {
System.out.println("线程 " + threadNum + " 开始计算...");
Thread.sleep((long) (Math.random() * 2000)); // 模拟计算耗时
resultMap.put("线程" + threadNum, threadNum * 10);
System.out.println("线程 " + threadNum + " 计算完成,等待其他线程...");
// 到达栅栏,等待其他线程
barrier.await();
System.out.println("线程 " + threadNum + " 越过栅栏,继续后续工作...");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, "Thread-" + i).start();
}
}
}
四、 常见面试延伸:与 CountDownLatch 的区别
| 特性 | CyclicBarrier (循环栅栏) | CountDownLatch (倒计时器) |
|---|---|---|
| 核心机制 | 基于 ReentrantLock + Condition | 基于 AQS 的共享锁机制 |
| 作用对象 | 线程之间互相等待,直到所有线程到达 | 一个/多个线程等待其他线程完成某些操作 |
| 可复用性 | 可循环使用(计数器归零后自动重置) | 一次性(计数器归零后无法重置) |
| 计数操作 | 调用 await() 既减计数又阻塞等待 |
调用 countDown() 减计数但不阻塞,调用 await() 阻塞等待 |
| 额外任务 | 支持传入 Runnable,在栅栏打开时优先执行 |
不支持 |