悦书阁 悦书阁
首页
学习笔记
技术文档
idea插件开发
更多
  • 分类
  • 标签
  • 归档

Felix

大道至简 知易行难
首页
学习笔记
技术文档
idea插件开发
更多
  • 分类
  • 标签
  • 归档
  • JVM

  • spring

  • 并发编程

    • 深入理解JMM和并发三大特性
    • CPU缓存一致性协议MESI
    • 深入理解synchronized
    • AQS之独占锁ReentrantLock
    • 深入理解AQS之Semaphorer&CountDownLatch
      • 前言
      • Semaphore介绍
        • Semaphore 常用方法
        • 限流场景实现
        • 加锁源码分析
        • nonfairTryAcquireShared 尝试获取锁
        • doAcquireSharedInterruptibly 获取锁失败了,开始 入队 阻塞
        • addWaiter 入队 和 独占锁逻辑一致
        • setHeadAndPropagate 抢救成功 设置节点信息并 尝试去唤醒下一个节点
        • shouldParkAfterFailedAcquire 抢救失败,去park挂起线程
        • 释放锁源码分析
        • releaseShared
      • CountDownLatch介绍
        • 常用方法
        • CountDownLatch应用场景
        • CountDownLatch实现原理
        • CountDownLatch与Thread.join的区别
        • CountDownLatch与CyclicBarrier的区别
        • CountDownLatch源码分析
        • 获取锁
        • acquireSharedInterruptibly 获取锁或入队
        • tryAcquireShared 获取锁或入队 阻塞 (入队 阻塞 和 Semaphore)
        • 释放锁
    • 深入理解AQS之CyclicBarrier
    • 深入理解AQS之ReentrantReadWriteLock
    • Collections之Map&List&Set详解
    • 阻塞队列BlockingQueue实战及其原理分析
    • 深入理解Java线程
    • Executor线程池原理与源码解读
    • 并发编程之定时任务&定时线程池
  • 消息中间件

  • 微服务

  • 三高架构

  • 学习笔记
  • 并发编程
liufei379
2022-07-28
目录

深入理解AQS之Semaphorer&CountDownLatch

# 前言

上文学到AQS独占锁的实现ReentrantLock,本节继续学习AQS共享锁的实现Semaphorer [ˈseməfɔːr]&CountDownLatch。

独占锁和共享锁有和区别?

  • Exclusive-独占,同时只能有一个线程抢占到锁执行,如ReentrantLock
  • Share-共享,多个线程可以同时抢占到锁执行,如Semaphore/CountDownLatch

# Semaphore介绍

  • Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,它也是基于AbstractQueuedSynchronizer实现的
  • Semaphore的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同时获取信号量。

image-20220728100234187

PV操作是操作系统一种实现进程互斥与同步的有效方法。PV操作与信号量(S)的处理相关,P表示通过的意思,V表示释放的意思。用PV操作来管理共享资源时,首先要确保PV操作自身执行的正确性。

P操作的主要动作是:

  1. S减1;
  2. 若S减1后仍大于或等于0,则进程继续执行;
  3. 若S减1后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,然后转进程调度。

V操作的主要动作是:

  1. S加1;
  2. 若相加后结果大于0,则进程继续执行;
  3. 若相加后结果小于或等于0,则从该信号的等待队列中释放一个等待进程,然后再返回原进程继续执行或转进程调度。

# Semaphore 常用方法

构造方法

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
1
2
3
4
5
6
7
  • permits 表示许可证的数量(资源数)
  • fair 表示公平性,如果这个设为 true 的话 表示是公平锁,下次执行的线程会是等待最久的线程

常用方法

public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength() 
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)
1
2
3
4
5
6
7
  • acquire() 表示阻塞并获取许可
  • tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
  • release() 表示释放许可
  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermit(int reduction):减少 reduction 个许可证
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合

# 限流场景实现

public class SemaphoneTest {

    /**
     * 实现一个同时只能处理5个请求的限流器
     */
    private static Semaphore semaphore = new Semaphore(5);

    /**
     * 定义一个线程池
     */
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));

    /**
     * 模拟执行方法
     */
    public static void exec() {
        try {
            semaphore.acquire(1);
            // 模拟真实方法执行
            System.out.println("执行exec方法" );
            Thread.sleep(2000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            semaphore.release(1);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        {
            for (; ; ) {
                Thread.sleep(100);
                // 模拟请求以10个/s的速度
                executor.execute(() -> exec());
            }
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 加锁源码分析

private static Semaphore semaphore = new Semaphore(5); //表示申请5个资源
Sync(int permits) {
    // 申请5个资源 设置state = 5
    setState(permits);
}
//抢占一个资源
semaphore.acquire(1); --> sync.acquireSharedInterruptibly(permits); 

// 获取锁 或者 入队
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //如果线程被中断 抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //tryAcquireShared(arg) 尝试抢占锁
    if (tryAcquireShared(arg) < 0)
        //尝试获取锁失败,入队 阻塞
        doAcquireSharedInterruptibly(arg);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# nonfairTryAcquireShared 尝试获取锁
 // 非公平尝试获取锁
final int nonfairTryAcquireShared(int acquires) {
    //李二狗的经典自旋
    for (;;) {
        //获取当前剩余资源
        int available = getState();
        //减去当前线程持有的资源数
        int remaining = available - acquires;
        if (remaining < 0 ||
        	// compareAndSetState  CAS尝试获取锁,获取成功执行代码,获取失败准备 入队 阻塞
            compareAndSetState(available, remaining))
            return remaining;
    }
}


// 公平尝试获取锁
protected int tryAcquireShared(int acquires) {
    for (;;) {
        //如果校验失败,则入队
        if (hasQueuedPredecessors())
            return -1;
        //和非公平一致
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
public final boolean hasQueuedPredecessors() {
	
    //校验说明 如果头节点=尾节点 说明还没有节点入队,要先去入队 返回false
    //如果与节点 != 尾节点 则 确认 下一个节点不为空且 线程为当前持有的线程 ,则 去尝试获取锁,反之去入队
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# doAcquireSharedInterruptibly 获取锁失败了,开始 入队 阻塞
// 入队  阻塞
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 入队
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        //来了来了,又看见李二狗的经典自旋
        for (;;) {
            //获取当前节点上一个节点
            final Node p = node.predecessor();
            if (p == head) {
                // 如果上个节点 = 头节点 则再次尝试获取锁 又走到了nonfairTryAcquireShared方法
                // tryAcquireShared 我觉得我还可以抢救一下
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //获取到锁 设置当前节点为头节点 并 将旧头节点的引用关系删除
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


//和独占锁一致 获取当前节点上一个节点
final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# addWaiter 入队 和 独占锁逻辑一致
// 入队 和 独占锁一致,只是 mode = Node.SHARE(即 nextWaiter = Node.SHARE) 共享标识
private Node addWaiter(Node mode) {
    // 构建当前线程的Node 节点 mode 表示是独占还是共享
    Node node = new Node(Thread.currentThread(), mode);
    //如果第一次进来尾节点为空 则 执行enq(node) 逻辑,否则 走 if 逻辑
    Node pred = tail;
    if (pred != null) {
        //尾节点不为空,则设置当前节点.上一个节点 = 之前的尾节点
        node.prev = pred;
        //通过CAS的方式 将当前节点设置为尾节点
        if (compareAndSetTail(pred, node)) {
            // 将上一个尾节点的.下一个节点 = 当前节点
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

//和独占锁一致 尾节点为空,则执行 enq 方法
/**
1.如果没有尾节点说明队列还未创建,则先创建一个头节点
2.因为只有一个头节点,所以 设置尾节点 = 头节点
3.有了头节点之后,设置当前节点的上一个节点 = 头节点
4.将当前节点设置成尾节点
5.将头节点的下一个节点 设置为当前节点
*/
private Node enq(final Node node) {
    //李二狗的经典自旋
    for (;;) {
        Node t = tail;
        if (t == null) {
            //尾节点为空,表示队列还未创建, 先为队列 创建一个头节点 
            // 通过cas方式设置头节点
            if (compareAndSetHead(new Node()))
                // 因为是新建队列,只能一个节点,那么头节点=尾节点
                tail = head;
            	// 然后通过自旋再次 执行 尾节点 != null
        } else {
			//尾节点不为空,则设置 当前节点的.上一个节点 = 之前的尾节点
            node.prev = t;
            //通过CAS的方式 将当前节点设置为尾节点
            if (compareAndSetTail(t, node)) {
                // 将上一个尾节点的.下一个节点 = 当前节点
                t.next = node;
                return t;
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# setHeadAndPropagate 抢救成功 设置节点信息并 尝试去唤醒下一个节点
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    //设置当前节点为头节点
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        //获取当前节点的下一个节点
        Node s = node.next;
        if (s == null || s.isShared())
            //去尝试唤醒下一个节点
            doReleaseShared();
    }
}

private void setHead(Node node) {
    //如果加锁成功,则把当前节点设置为 头节点
    head = node;
    // 把线程设置为null
    node.thread = null;
    // 把上一个节点设置为空 帮助GC 上一个节点
    node.prev = null;
}

//准备去唤醒下一个节点
private void doReleaseShared() {

    for (;;) {
        //此时head已经是当前节点
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // waitStatus = -1 表示下一个节点可以被唤醒
            if (ws == Node.SIGNAL) {
                // 将当前节点waitStatus 通过CAS设置成0 设置成0,设置成功,去唤醒 下一个节点
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;          
                //去唤醒下一个节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;               
        }
        if (h == head)                
            break;
    }
}

//和独占锁一致
private void unparkSuccessor(Node node) {
    
    int ws = node.waitStatus;
    if (ws < 0)
        // 如果小于0 通过CAS 设置成0
        compareAndSetWaitStatus(node, ws, 0);
	// 获取当前节点的下一个节点
    Node s = node.next;
    //如果下一个节点为空 ,或者线程被取消,则通过尾节点找到符合的最靠近头部的节点
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //去唤醒该节点
        LockSupport.unpark(s.thread);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# shouldParkAfterFailedAcquire 抢救失败,去park挂起线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            // 把当前节点的上一个节点指向 上一个节点的上一个,表示剔除 当前节点的上个节点
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        //将上一个节点的再上一个节点 的 下一个节点 指向 当前节点
        // 上一个节点.上一个节点.下一个节点 = 当前节点
        pred.next = node;
    } else {
        //cas 设置上一个节点 waitStatus 为-1 表示上一个节点可以被唤醒
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    //线程挂起
    LockSupport.park(this);
    //清除中断标志,清除完中断标识,后续需要再重新复位(  selfInterrupt();) 因为后续可能还会有中断唤醒的操作
    return Thread.interrupted();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 释放锁源码分析

release() ->  sync.releaseShared(1)
1
# releaseShared
public final boolean releaseShared(int arg) {
    //尝试释放共享锁
    if (tryReleaseShared(arg)) {
        //去释放锁
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        //剩余资源
        int current = getState();
        int next = current + releases;
        //释放后资源小于剩余资源 返回异常
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // cas 成功返回true
        if (compareAndSetState(current, next))
            return true;
    }
}


//准备去唤醒下一个节点
private void doReleaseShared() {

    for (;;) {
        //此时head已经是当前节点
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // waitStatus = -1 表示下一个节点可以被唤醒
            if (ws == Node.SIGNAL) {
                // 将当前节点waitStatus 通过CAS设置成0 设置成0,设置成功,去唤醒 下一个节点
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;          
                //去唤醒下一个节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;               
        }
        if (h == head)                
            break;
    }
}

//和独占锁一致
private void unparkSuccessor(Node node) {
    
    int ws = node.waitStatus;
    if (ws < 0)
        // 如果小于0 通过CAS 设置成0
        compareAndSetWaitStatus(node, ws, 0);
	// 获取当前节点的下一个节点
    Node s = node.next;
    //如果下一个节点为空 ,或者线程被取消,则通过尾节点找到符合的最靠近头部的节点
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //去唤醒该节点
        LockSupport.unpark(s.thread);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

# CountDownLatch介绍

CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。

CountDownLatch使用给定的计数值(count)初始化。await方法会阻塞直到当前的计数值(count)由于countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并且随后对await方法的调用都会立即返回。这是一个一次性现象 —— count不会被重置。如果你需要一个重置count的版本,那么请考虑使用CyclicBarrier。

# 常用方法

// 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行
public void await() throws InterruptedException { };  
// 和 await() 类似,若等待 timeout 时长后,count 值还是没有变为 0,不再等待,继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  
// 会将 count 减 1,直至为 0
1
2
3
4
5

# CountDownLatch应用场景

CountDownLatch一般用作多线程倒计时计数器,强制它们等待其他一组(CountDownLatch的初始化决定)任务执行完成。

CountDownLatch的两种使用场景:

  • 场景1:让多个线程等待
  • 场景2:让单个线程等待。

# CountDownLatch实现原理

底层基于 AbstractQueuedSynchronizer 实现,CountDownLatch 构造函数中指定的count直接赋给AQS的state;每次countDown()则都是release(1)减1,最后减到0时unpark阻塞线程;这一步是由最后一个执行countdown方法的线程执行的。

而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。

# CountDownLatch与Thread.join的区别

  • CountDownLatch的作用就是允许一个或多个线程等待其他线程完成操作,看起来有点类似join() 方法,但其提供了比 join() 更加灵活的API。
  • CountDownLatch可以手动控制在n个线程里调用n次countDown()方法使计数器进行减一操作,也可以在一个线程里调用n次执行减一操作。
  • 而 join() 的实现原理是不停检查join线程是否存活,如果 join线程存活则让当前线程永远等待。所以两者之间相对来说还是CountDownLatch使用起来较为灵活

# CountDownLatch与CyclicBarrier的区别

CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:

  • CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次
  • CyclicBarrier还提供getNumberWaiting(可以获得CyclicBarrier阻塞的线程数量)、isBroken(用来知道阻塞的线程是否被中断)等方法。
  • CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
  • CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
  • CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
  • CyclicBarrier是通过ReentrantLock的"独占锁"和Conditon来实现一组线程的阻塞唤醒的,而CountDownLatch则是通过AQS的“共享锁”实现

# CountDownLatch源码分析

CountDownLatch countDownLatch = new CountDownLatch(5);; //表示申请5个资源
Sync(int permits) {
    // 申请5个资源 设置state = 5
    setState(permits);
}
1
2
3
4
5

# 获取锁

 countDownLatch.await();
1
# acquireSharedInterruptibly 获取锁或入队

// 获取锁 或者 入队
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //如果线程被中断 抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //tryAcquireShared(arg) 尝试抢占锁
    if (tryAcquireShared(arg) < 0)
        //尝试获取锁失败,入队 阻塞
        doAcquireSharedInterruptibly(arg);
}
1
2
3
4
5
6
7
8
9
10
11
12
# tryAcquireShared 获取锁或入队 阻塞 (入队 阻塞 和 Semaphore)
protected int tryAcquireShared(int acquires) {
    //如果state 等于0 表示获取到锁,不等于0 表示没有获取到锁 入队 阻塞 
    return (getState() == 0) ? 1 : -1;
}
1
2
3
4

# 释放锁

countDownLatch.countDown();

public void countDown() {
    sync.releaseShared(1);
}

//尝试释放锁
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        //释放锁 唤醒线程 与 Semaphorer一致
        doReleaseShared();
        return true;
    }
    return false;
}

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        //获取资源数
        int c = getState();
        //如果资源为0,说明异常,返回false
        if (c == 0)
            return false;
        //资源减1
        int nextc = c-1;
        // cas 替换资源数
        if (compareAndSetState(c, nextc))
            //如果剩余资源数==0 返回true 唤醒线程
            return nextc == 0;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
上次更新: 2026/3/11 22:17:56
AQS之独占锁ReentrantLock
深入理解AQS之CyclicBarrier

← AQS之独占锁ReentrantLock 深入理解AQS之CyclicBarrier→

最近更新
01
实现idea开发的关键步骤
10-05
02
Redis高可用架构
09-09
03
Zookeeper高可用
08-31
更多文章>
Theme by Vdoing | Copyright © 2022-2026 Felix | 粤ICP备17101757号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式