悦书阁 悦书阁
首页
学习笔记
技术文档
idea插件开发
更多
  • 分类
  • 标签
  • 归档

Felix

大道至简 知易行难
首页
学习笔记
技术文档
idea插件开发
更多
  • 分类
  • 标签
  • 归档
  • JVM

  • spring

  • 并发编程

    • 深入理解JMM和并发三大特性
    • CPU缓存一致性协议MESI
    • 深入理解synchronized
    • AQS之独占锁ReentrantLock
    • 深入理解AQS之Semaphorer&CountDownLatch
    • 深入理解AQS之CyclicBarrier
      • 前言
      • CyclicBarrier介绍
        • CyclicBarrier的使用
        • 构造方法
        • 重要方法
        • CyclicBarrier与CountDownLatch的区别
      • 源码分析
        • 构造函数
        • 执行 await()
        • trip.await() 入队 阻塞
        • addConditionWaiter 添加条件等待队列
        • acquireQueued 被唤醒去获取锁
        • fullyRelease(node) 释放锁
        • nextGeneration 唤醒阻塞线程 开启下一代
    • 深入理解AQS之ReentrantReadWriteLock
    • Collections之Map&List&Set详解
    • 阻塞队列BlockingQueue实战及其原理分析
    • 深入理解Java线程
    • Executor线程池原理与源码解读
    • 并发编程之定时任务&定时线程池
  • 消息中间件

  • 微服务

  • 三高架构

  • 学习笔记
  • 并发编程
liufei379
2022-07-29
目录

深入理解AQS之CyclicBarrier

# 前言

上文学习了AQS的实现Semaphorer [ˈseməfɔːr]&CountDownLatch,本节继续学习CyclicBarrier

  • Semaphorer 和 CountDownLatch 的实现都是基于AQS的共享锁
  • CyclicBarrier 的实现是通过 ReentrantLock + 条件队列来实现的。

# CyclicBarrier介绍

回环栅栏(循环屏障),通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。

image-20220729165321004

# CyclicBarrier的使用

# 构造方法
 // parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
 public CyclicBarrier(int parties)
 // 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
1
2
3
# 重要方法
//屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
//循环通过reset()方法可以进行重置
1
2
3
4
5
# CyclicBarrier与CountDownLatch的区别

查看深入理解AQS之Semaphore&CountDownLatch (opens new window)

# 源码分析

# 构造函数

//5 表示要达到5个资源 才可以释放,表示达到条件后 要先执行 () -> System.out.println("执行")  然后再执行wait之后的方法
ClicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("执行"));

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    //复位节点
    this.parties = parties;
    //计数器 每次await减一,当计数器为0时 唤醒线程 执行后续代码
    this.count = parties;
    //Runnable 唤醒是先执行Runnable 方法 再执行await之后的代码
    this.barrierCommand = barrierAction;
}
1
2
3
4
5
6
7
8
9
10
11
12

# 执行 await()

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //代标识 满足同一个阶段唤醒条件的线程为一代
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            //线程中断,唤醒所有线程, 条件等待队列转同步等待队列
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        if (index == 0) {
            boolean ranAction = false;
            try {
                //执行 Runnable  run()
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                //唤醒线程 开启下一代
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    // 程序异常 唤醒线程 条件等待队列转同步等待队列
                    breakBarrier();
            }
        }

        //李二狗自旋
        for (;;) {
            try {
                if (!timed)
                    //入队 阻塞(条件阻塞队列)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        //解锁
        lock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

# trip.await() 入队 阻塞

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //添加条件等待队列
    Node node = addConditionWaiter();
    //释放ReentrantLock锁 不释放的话其他线程无法进入
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //是否是同步等待队列 因为这里是条件队列,所以返回false 进入循环 阻塞
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //acquireQueued 被唤醒去获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# addConditionWaiter 添加条件等待队列

private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //创建队列 Node.CONDITION = -2 表示条件等待队列
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        //如果尾节点为空,表示没有队列, 设置头节点为当前节点
        firstWaiter = node;
    else
        //如果尾节点不为空 设置尾节点的下一个节点为当前节点 
        //同步等待队列是 t.next 条件等待队列是 t.nextWaiter
        t.nextWaiter = node;
    //把当前节点设置为尾节点
    lastWaiter = node;
    return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# acquireQueued 被唤醒去获取锁
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# fullyRelease(node) 释放锁

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        //reentrantLock 释放锁逻辑
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# nextGeneration 唤醒阻塞线程 开启下一代

private void nextGeneration() {
    // 唤醒阻塞线程
    trip.signalAll();
    //重置计数器
    count = parties;
    //创建下一代
    generation = new Generation();
}
//唤醒线程
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        //从首节点开始唤醒
        doSignalAll(first);
}

private void doSignalAll(Node first) {
    //清空引用 帮助GC
    lastWaiter = firstWaiter = null;
    do {
        //获取下一个节点
        Node next = first.nextWaiter;
        //清空当前节点的下一个节点 帮助GC
        first.nextWaiter = null;
        //开始唤醒当前节点
        transferForSignal(first);
        first = next;
    } while (first != null);
}


final boolean transferForSignal(Node node) {
	//设置状态为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
	//将条件队列转到同步等待队列
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //线程中断 或者CAS waitStatus = -1 失败,则唤醒线程
        LockSupport.unpark(node.thread);
    return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
上次更新: 2026/3/11 22:17:56
深入理解AQS之Semaphorer&CountDownLatch
深入理解AQS之ReentrantReadWriteLock

← 深入理解AQS之Semaphorer&CountDownLatch 深入理解AQS之ReentrantReadWriteLock→

最近更新
01
实现idea开发的关键步骤
10-05
02
Redis高可用架构
09-09
03
Zookeeper高可用
08-31
更多文章>
Theme by Vdoing | Copyright © 2022-2026 Felix | 粤ICP备17101757号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式