悦书阁 悦书阁
首页
学习笔记
技术文档
idea插件开发
更多
  • 分类
  • 标签
  • 归档

Felix

大道至简 知易行难
首页
学习笔记
技术文档
idea插件开发
更多
  • 分类
  • 标签
  • 归档
  • JVM

  • spring

  • 并发编程

    • 深入理解JMM和并发三大特性
    • CPU缓存一致性协议MESI
    • 深入理解synchronized
    • AQS之独占锁ReentrantLock
      • 什么是管程
      • 什么是AQS
        • AQS具备的特性
        • AQS内部维护属性volatile int state
        • AQS定义两种资源共享方式
        • AQS定义两种队列
        • AQS 定义了5个队列中节点状态:
      • 同步等待队列
      • 条件等待队列
      • ReentrantLock详解
        • ReentrantLock特点
        • synchronized和ReentrantLock的区别
      • ReentrantLock源码剖析
        • 前置知识
        • 模拟场景
        • 对象获取
        • ReentrantLock公平锁和非公平锁的区别
        • 加锁源码
        • cas 失败 acquire逻辑
        • tryAcquire 再次尝试获取锁
        • addWaiter 再次获取锁失败,加入队列
        • 当前节点入队后执行 acquireQueued
        • 总结:
        • 解锁源码
        • release
        • tryRelease(arg)
        • unparkSuccessor
        • 总结
    • 深入理解AQS之Semaphorer&CountDownLatch
    • 深入理解AQS之CyclicBarrier
    • 深入理解AQS之ReentrantReadWriteLock
    • Collections之Map&List&Set详解
    • 阻塞队列BlockingQueue实战及其原理分析
    • 深入理解Java线程
    • Executor线程池原理与源码解读
    • 并发编程之定时任务&定时线程池
  • 消息中间件

  • 微服务

  • 三高架构

  • 学习笔记
  • 并发编程
liufei379
2022-07-21
目录

AQS之独占锁ReentrantLock

# 什么是管程

管程是指管理共享变量以及对共享变量的操作过程,让它们支持并发

image-20220721145615316

  • 管程中的入口等待队列 即 同步等待队列(获取锁有关)
  • 管程中的条件等待队列:解决线程的同步问题 实现 阻塞唤醒机制
  • JVM层面对管层的实现是 synchronized : Monitor
  • JDK层面的实现是 AQS
    • synchronized 加锁解锁 需要从用户态切换到内核态 耗性能
    • synchronized 无法手动解锁

# 什么是AQS

java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这些行为的抽象就是基于AbstractQueuedSynchronizer(简称AQS)实现的,AQS是一个抽象同步框架,可以用来实现一个依赖状态的同步器。

# AQS具备的特性

  • 阻塞等待队列
  • 共享/独占
  • 公平/非公平
  • 可重入
  • 允许中断

# AQS内部维护属性volatile int state

  • state表示资源的可用状态

    • 通过CAS方式确认是否能否拿到锁,如果拿到锁返回true 设置state = 1
  • State三种访问方式:

    • getState()

    • setState()

    • compareAndSetState()

# AQS定义两种资源共享方式

  • Exclusive-独占,只有一个线程能执行,如ReentrantLock
  • Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

# AQS定义两种队列

  • 同步等待队列: 主要用于维护获取锁失败时入队的线程
  • 条件等待队列: 调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁

# AQS 定义了5个队列中节点状态:

  1. 值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
  2. CANCELLED,值为1,表示当前的线程被取消;
  3. SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
  4. CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
  5. PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;

不同的自定义同步器竞争共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

# 同步等待队列

AQS当中的同步等待队列一种基于双向链表数据结构的队列,是FIFO先进先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种**,线程由原自旋机制改为阻塞机制**。

AQS 依赖CLH同步队列来完成同步状态的管理:

  • 当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程

  • 当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

  • 通过signal或signalAll将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)

    image-20220721162947124

# 条件等待队列

AQS中条件队列是使用单向列表保存的,用nextWaiter来连接,

  • 调用Condition#await方法会释放当前持有的锁,然后阻塞当前线程,同时向Condition队列尾部添加一个节点,所以调用Condition#await方法的时候必须持有锁。
  • 调用Condition#signal方法会将Condition队列的首节点移动到阻塞队列尾部,然后唤醒因调用Condition#await方法而阻塞的线程(唤醒之后这个线程就可以去竞争锁了),所以调用Condition#signal方法的时候必须持有锁,持有锁的线程唤醒被因调用Condition#await方法而阻塞的线程。

# ReentrantLock详解

ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。

# ReentrantLock特点

  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量
  • 与 synchronized 一样,都支持可重入

# synchronized和ReentrantLock的区别

  • synchronized是JVM层次的锁实现,ReentrantLock是JDK层次的锁实现;
  • synchronized的锁状态是无法在代码中直接判断的,但是ReentrantLock可以通过ReentrantLock#isLocked判断;
  • synchronized是非公平锁,ReentrantLock是可以是公平也可以是非公平的;
  • synchronized是不可以被中断的,而ReentrantLock#lockInterruptibly方法是可以被中断的;
  • 在发生异常时synchronized会自动释放锁,而ReentrantLock需要开发者在finally块中显示释放锁;
  • ReentrantLock获取锁的形式有多种:如立即返回是否成功的tryLock(),以及等待指定时长的获取,更加灵活;
  • synchronized在特定的情况下对于已经在等待的线程是后来的线程先获得锁(回顾一下sychronized的唤醒策略),而ReentrantLock对于已经在等待的线程是先来的线程先获得锁;

# ReentrantLock源码剖析

# 前置知识

    static final class Node {
        /** 表示线程已取消 */
        static final int CANCELLED =  1;
        /**表示后继线程需要 被唤醒 */
        static final int SIGNAL = -1;
        /**表示等待满足条件 条件队列*/
        static final int CONDITION = -2;
        /** 后续补充 */
        static final int PROPAGATE = -3;
1
2
3
4
5
6
7
8
9

# 模拟场景

public class ReentrantLockDemo {

    private static  int sum = 0;
    private static Lock lock = new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {

        for (int i = 0; i < 3; i++) {
            Thread thread = new Thread(()->{
                //加锁
                lock.lock();
                try {
                    for (int j = 0; j < 10000; j++) {
                        sum++;
                    }
                } finally {
                    // 解锁
                    lock.unlock();
                }
            });
            thread.start();
        }
        Thread.sleep(2000);
        System.out.println(sum);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

在lock.lock() 和 lock.unlock() 断点,然后右键断点选择Thread模式,即可查看多线程的执行过程

image-20220721171626210

左下脚可以进行线程的切换

image-20220721172202468

# 对象获取

Lock lock = new ReentrantLock(); //获取锁对象 非公平
Lock lock = new ReentrantLock(true); //如果构造函数参数传入true 则是公平锁,如果传入false则是 非公平锁

//代码规范
//加锁    
lock.lock(); 
try {  
    //临界区 
} finally { 
    // 解锁 
    lock.unlock();  
}
1
2
3
4
5
6
7
8
9
10
11
12

# ReentrantLock公平锁和非公平锁的区别

  • 非公平锁在未抢到锁的时候,会先通过CAS尝试获取锁,CAS失败后才会入队
  • 公平锁在未抢到锁的时候,则直接入队

# 加锁源码

final void lock() {
    //如果compareAndSetState(0, 1) 返回true 表示获取锁成功
    if (compareAndSetState(0, 1))
        //设置当前线程独占  如当前线程是Thread0 那么 exclusiveOwnerThread = Thread0
        setExclusiveOwnerThread(Thread.currentThread());
    else
        //cas 失败
        acquire(1);
}
1
2
3
4
5
6
7
8
9
# cas 失败 acquire逻辑
public final void acquire(int arg) {
    //tryAcquire(arg)先再次尝试获取锁,如果还失败,则acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 执行加入队列逻辑
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //如果是中断唤醒,那么需要进行复位,因为后续代码可能又需要中断
        selfInterrupt();
}
1
2
3
4
5
6
7
# tryAcquire 再次尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //是否其他线程已释放锁
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //重入锁判断 当前线程 = 持有线程
    else if (current == getExclusiveOwnerThread()) {
        //如果重入,则state +1
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# addWaiter 再次获取锁失败,加入队列
private Node addWaiter(Node mode) {
    //构建node节点  mode 表示是独占 还是共享  
    Node node = new Node(Thread.currentThread(), mode);
    //设置尾节点
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

//入参是当前线程创建的节点
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            //没人排队,则新建空对象节点
            if (compareAndSetHead(new Node()))
                // 将头尾都指向了这个初始化的Node 然后再次进入循环 
                // 下次进入循环就有了头节点
                tail = head;
        } else {
			//将当前线程的上一个节点指向 之前的最后一个节点
            node.prev = t;
            // 基于CAS的方式,将tail节点设置为当前节点 即 把当前节点设置为节点
            if (compareAndSetTail(t, node)) {
                //然后把之前尾节点的下一个节点指向当前节点(也就是现在的尾节点)
                t.next = node;
                return t;
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 当前节点入队后执行 acquireQueued
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //获取当前节点的上一个节点
            final Node p = node.predecessor();
            //上个节点是否头节点,是否获取到锁
            if (p == head && tryAcquire(arg)) {
                //设置当前节点为头节点
                setHead(node);
                //清空父节点的下一个节点  即清除引诱用关系,让父节点可以被回收
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //获取锁失败,则考虑去park
            //shouldParkAfterFailedAcquire(p, node)  1.设置上一个节点的waitStatus = -1 
            if (shouldParkAfterFailedAcquire(p, node) &&
                //基于Unsafe类的park方法,线程挂起 阻塞当前线程  park 可以识别中断标记 可以被唤醒
                parkAndCheckInterrupt()) // 针对fail属性,这里是唯一可能出现异常的地方,JVM内部出现问题时,可以这么理解,fianlly代码块中的内容,执行的几率约等于0~
                interrupted = true;
        }
    } finally {
        if (failed)
            //程序异常出队
            cancelAcquire(node);
    }
}


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            // 把当前节点的上一个节点指向 上一个节点的上一个,表示剔除 当前节点的上个节点
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        //将上一个节点的再上一个节点 的 下一个节点 指向 当前节点
        // 上一个节点.上一个节点.下一个节点 = 当前节点
        pred.next = node;
    } else {
        //cas 设置上一个节点 waitStatus 为-1 表示上一个节点可以被唤醒
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    //线程挂起
    LockSupport.park(this);
    //清除中断标志,清除完中断标识,后续需要再重新复位(  selfInterrupt();) 因为后续可能还会有中断唤醒的操作
    return Thread.interrupted();
}

//程序执行异常 执行出队逻辑
private void cancelAcquire(Node node) {
    
    if (node == null)
        return;

    //清空线程信息
    node.thread = null;

    // 获取上一个节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        //往上寻找非失效节点 当前节点的上一个节点赋值为未失效节点
        node.prev = pred = pred.prev;
		
    // 获取上一个节点的下一个 节点
    Node predNext = pred.next;
	
    // 将当前节点设置为失效节点 状态1
    node.waitStatus = Node.CANCELLED;
	
    //如果当前节点是尾节点,将尾节点设置为最近的有效节点(如果当前节点为尾节点的操作)
    if (node == tail && compareAndSetTail(node, pred)) {
        // 然后再尾节点的下一个节点设置为null
        compareAndSetNext(pred, predNext, null);
    } else {
		// 中间节点操作
		// 如果上一个节点不是头节点
        int ws;
        if (pred != head &&
            //获取上一节点状态,是不是有效
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);// 尝试将pred的前驱节点的next指向当前节点的next(必须是有效的next节点)
        } else {
            // 头结点,唤醒后继节点
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}


private void setHead(Node node) {
    //如果加锁成功,则把当前节点设置为 头节点
    head = node;
    // 把线程设置为null
    node.thread = null;
    // 把上一个节点设置为空 帮助GC 上一个节点
    node.prev = null;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# 总结:
  1. ReentrantLock 的实现分为 公平 和 非公平的实现,源码差异是公平锁直接入队、阻塞;非公平锁先通过CAS尝试获取锁,获取失败再入队阻塞。
  2. 入队之前会再次尝试获取锁,并校验是否存在锁重入,尝试加锁失败执行第3点
  3. 入队时,如果已经存在尾节点,则直接在队尾入队,如果不存在(尾节点不存在说明没有队列),先创建头节点,再入队
  4. 入队之后,会判断上个节点是否头节点,如果是的话,再次尝试加锁。加锁成功则 将当前节点设置为头节点,并清空上一个头节点的指针,帮助GC,如果失败则实现第5步
  5. 先设置上一个节点 waitStatus = -1 表示当前节点可以被上一个节点唤醒,然后park阻塞

# 解锁源码

LockSupport.unpark(s.thread);唤醒下一个节点所在的线程

public void unlock() {
    //state - 1 因为存在重入锁的情况
	sync.release(1);
}
1
2
3
4
# release
public final boolean release(int arg) {
    //尝试解锁
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
        	//如果头节点状态不为0,则unpark唤醒其他线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}
1
2
3
4
5
6
7
8
9
10
11
# tryRelease(arg)
protected final boolean tryRelease(int releases) {
    //当前数量-1 可能存在重入情况
    int c = getState() - releases;
    //独占线程是否是当前线程,如果不是抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        //如果 c==0 表示 当前线程的锁都已经释放,情况独占线程
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# unparkSuccessor
private void unparkSuccessor(Node node) {
	
    int ws = node.waitStatus;
    if (ws < 0)
        //头节点的等待状态是否小于0, 是的话改成0
        compareAndSetWaitStatus(node, ws, 0);
	//获取下一个节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        //  如果下一个节点为null 或者 线程的状态被取消, 则从尾节点开始往上找未取消的节点
        s = null;
        //因为当前节点为空,没办法s.next 会空指针 所以只能从tail 往前找
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //如果下一个节点不会空,则唤醒下一个节点
        LockSupport.unpark(s.thread);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 总结
  1. 解锁之前 校验state -1 是否等于0,如果不是说明存在锁重入,再次等待解锁,如果是,则进入解锁流程。
  2. 解锁时,设置exclusiveOwnerThread = null 释放线程持有标识
  3. 确认waitStatus是否不等于0,如果是则唤醒后续节点。存在2种情况
    1. waitStatus<0,直接取unpark下一个节点
    2. 下一个节点为空或者waitStatus 大于0 ,则通过尾节点向上找到最靠近头部的有效节点去unpark
上次更新: 2026/3/11 22:17:56
深入理解synchronized
深入理解AQS之Semaphorer&CountDownLatch

← 深入理解synchronized 深入理解AQS之Semaphorer&CountDownLatch→

最近更新
01
实现idea开发的关键步骤
10-05
02
Redis高可用架构
09-09
03
Zookeeper高可用
08-31
更多文章>
Theme by Vdoing | Copyright © 2022-2026 Felix | 粤ICP备17101757号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式