深入理解AQS之CyclicBarrier
# 前言
上文学习了AQS的实现Semaphorer [ˈseməfɔːr]&CountDownLatch,本节继续学习CyclicBarrier
- Semaphorer 和 CountDownLatch 的实现都是基于AQS的共享锁
- CyclicBarrier 的实现是通过 ReentrantLock + 条件队列来实现的。
# CyclicBarrier介绍
回环栅栏(循环屏障),通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。

# CyclicBarrier的使用
# 构造方法
// parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
public CyclicBarrier(int parties)
// 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
1
2
3
2
3
# 重要方法
//屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
//循环通过reset()方法可以进行重置
1
2
3
4
5
2
3
4
5
# CyclicBarrier与CountDownLatch的区别
查看深入理解AQS之Semaphore&CountDownLatch (opens new window)
# 源码分析
# 构造函数
//5 表示要达到5个资源 才可以释放,表示达到条件后 要先执行 () -> System.out.println("执行") 然后再执行wait之后的方法
ClicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("执行"));
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
//复位节点
this.parties = parties;
//计数器 每次await减一,当计数器为0时 唤醒线程 执行后续代码
this.count = parties;
//Runnable 唤醒是先执行Runnable 方法 再执行await之后的代码
this.barrierCommand = barrierAction;
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 执行 await()
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//代标识 满足同一个阶段唤醒条件的线程为一代
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
//线程中断,唤醒所有线程, 条件等待队列转同步等待队列
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) {
boolean ranAction = false;
try {
//执行 Runnable run()
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//唤醒线程 开启下一代
nextGeneration();
return 0;
} finally {
if (!ranAction)
// 程序异常 唤醒线程 条件等待队列转同步等待队列
breakBarrier();
}
}
//李二狗自旋
for (;;) {
try {
if (!timed)
//入队 阻塞(条件阻塞队列)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//解锁
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# trip.await() 入队 阻塞
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//添加条件等待队列
Node node = addConditionWaiter();
//释放ReentrantLock锁 不释放的话其他线程无法进入
int savedState = fullyRelease(node);
int interruptMode = 0;
//是否是同步等待队列 因为这里是条件队列,所以返回false 进入循环 阻塞
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//acquireQueued 被唤醒去获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# addConditionWaiter 添加条件等待队列
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建队列 Node.CONDITION = -2 表示条件等待队列
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
//如果尾节点为空,表示没有队列, 设置头节点为当前节点
firstWaiter = node;
else
//如果尾节点不为空 设置尾节点的下一个节点为当前节点
//同步等待队列是 t.next 条件等待队列是 t.nextWaiter
t.nextWaiter = node;
//把当前节点设置为尾节点
lastWaiter = node;
return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# acquireQueued 被唤醒去获取锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# fullyRelease(node) 释放锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
//reentrantLock 释放锁逻辑
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# nextGeneration 唤醒阻塞线程 开启下一代
private void nextGeneration() {
// 唤醒阻塞线程
trip.signalAll();
//重置计数器
count = parties;
//创建下一代
generation = new Generation();
}
//唤醒线程
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//从首节点开始唤醒
doSignalAll(first);
}
private void doSignalAll(Node first) {
//清空引用 帮助GC
lastWaiter = firstWaiter = null;
do {
//获取下一个节点
Node next = first.nextWaiter;
//清空当前节点的下一个节点 帮助GC
first.nextWaiter = null;
//开始唤醒当前节点
transferForSignal(first);
first = next;
} while (first != null);
}
final boolean transferForSignal(Node node) {
//设置状态为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将条件队列转到同步等待队列
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//线程中断 或者CAS waitStatus = -1 失败,则唤醒线程
LockSupport.unpark(node.thread);
return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
上次更新: 2026/3/11 22:17:56