AQS之独占锁ReentrantLock
# 什么是管程
管程是指管理共享变量以及对共享变量的操作过程,让它们支持并发

- 管程中的入口等待队列 即 同步等待队列(获取锁有关)
- 管程中的条件等待队列:解决线程的同步问题 实现 阻塞唤醒机制
- JVM层面对管层的实现是 synchronized : Monitor
- JDK层面的实现是 AQS
- synchronized 加锁解锁 需要从用户态切换到内核态 耗性能
- synchronized 无法手动解锁
# 什么是AQS
java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这些行为的抽象就是基于AbstractQueuedSynchronizer(简称AQS)实现的,AQS是一个抽象同步框架,可以用来实现一个依赖状态的同步器。
# AQS具备的特性
- 阻塞等待队列
- 共享/独占
- 公平/非公平
- 可重入
- 允许中断
# AQS内部维护属性volatile int state
state表示资源的可用状态
- 通过CAS方式确认是否能否拿到锁,如果拿到锁返回true 设置state = 1
State三种访问方式:
getState()
setState()
compareAndSetState()
# AQS定义两种资源共享方式
- Exclusive-独占,只有一个线程能执行,如ReentrantLock
- Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
# AQS定义两种队列
- 同步等待队列: 主要用于维护获取锁失败时入队的线程
- 条件等待队列: 调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁
# AQS 定义了5个队列中节点状态:
- 值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
- CANCELLED,值为1,表示当前的线程被取消;
- SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
- CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
- PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
不同的自定义同步器竞争共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
# 同步等待队列
AQS当中的同步等待队列一种基于双向链表数据结构的队列,是FIFO先进先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种**,线程由原自旋机制改为阻塞机制**。
AQS 依赖CLH同步队列来完成同步状态的管理:
当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程
当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
通过signal或signalAll将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)

# 条件等待队列
AQS中条件队列是使用单向列表保存的,用nextWaiter来连接,
- 调用Condition#await方法会释放当前持有的锁,然后阻塞当前线程,同时向Condition队列尾部添加一个节点,所以调用Condition#await方法的时候必须持有锁。
- 调用Condition#signal方法会将Condition队列的首节点移动到阻塞队列尾部,然后唤醒因调用Condition#await方法而阻塞的线程(唤醒之后这个线程就可以去竞争锁了),所以调用Condition#signal方法的时候必须持有锁,持有锁的线程唤醒被因调用Condition#await方法而阻塞的线程。
# ReentrantLock详解
ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。
# ReentrantLock特点
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
- 与 synchronized 一样,都支持可重入
# synchronized和ReentrantLock的区别
- synchronized是JVM层次的锁实现,ReentrantLock是JDK层次的锁实现;
- synchronized的锁状态是无法在代码中直接判断的,但是ReentrantLock可以通过ReentrantLock#isLocked判断;
- synchronized是非公平锁,ReentrantLock是可以是公平也可以是非公平的;
- synchronized是不可以被中断的,而ReentrantLock#lockInterruptibly方法是可以被中断的;
- 在发生异常时synchronized会自动释放锁,而ReentrantLock需要开发者在finally块中显示释放锁;
- ReentrantLock获取锁的形式有多种:如立即返回是否成功的tryLock(),以及等待指定时长的获取,更加灵活;
- synchronized在特定的情况下对于已经在等待的线程是后来的线程先获得锁(回顾一下sychronized的唤醒策略),而ReentrantLock对于已经在等待的线程是先来的线程先获得锁;
# ReentrantLock源码剖析
# 前置知识
static final class Node {
/** 表示线程已取消 */
static final int CANCELLED = 1;
/**表示后继线程需要 被唤醒 */
static final int SIGNAL = -1;
/**表示等待满足条件 条件队列*/
static final int CONDITION = -2;
/** 后续补充 */
static final int PROPAGATE = -3;
2
3
4
5
6
7
8
9
# 模拟场景
public class ReentrantLockDemo {
private static int sum = 0;
private static Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(()->{
//加锁
lock.lock();
try {
for (int j = 0; j < 10000; j++) {
sum++;
}
} finally {
// 解锁
lock.unlock();
}
});
thread.start();
}
Thread.sleep(2000);
System.out.println(sum);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
在lock.lock() 和 lock.unlock() 断点,然后右键断点选择Thread模式,即可查看多线程的执行过程

左下脚可以进行线程的切换

# 对象获取
Lock lock = new ReentrantLock(); //获取锁对象 非公平
Lock lock = new ReentrantLock(true); //如果构造函数参数传入true 则是公平锁,如果传入false则是 非公平锁
//代码规范
//加锁
lock.lock();
try {
//临界区
} finally {
// 解锁
lock.unlock();
}
2
3
4
5
6
7
8
9
10
11
12
# ReentrantLock公平锁和非公平锁的区别
- 非公平锁在未抢到锁的时候,会先通过CAS尝试获取锁,CAS失败后才会入队
- 公平锁在未抢到锁的时候,则直接入队
# 加锁源码
final void lock() {
//如果compareAndSetState(0, 1) 返回true 表示获取锁成功
if (compareAndSetState(0, 1))
//设置当前线程独占 如当前线程是Thread0 那么 exclusiveOwnerThread = Thread0
setExclusiveOwnerThread(Thread.currentThread());
else
//cas 失败
acquire(1);
}
2
3
4
5
6
7
8
9
# cas 失败 acquire逻辑
public final void acquire(int arg) {
//tryAcquire(arg)先再次尝试获取锁,如果还失败,则acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 执行加入队列逻辑
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//如果是中断唤醒,那么需要进行复位,因为后续代码可能又需要中断
selfInterrupt();
}
2
3
4
5
6
7
# tryAcquire 再次尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//是否其他线程已释放锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//重入锁判断 当前线程 = 持有线程
else if (current == getExclusiveOwnerThread()) {
//如果重入,则state +1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# addWaiter 再次获取锁失败,加入队列
private Node addWaiter(Node mode) {
//构建node节点 mode 表示是独占 还是共享
Node node = new Node(Thread.currentThread(), mode);
//设置尾节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//入参是当前线程创建的节点
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//没人排队,则新建空对象节点
if (compareAndSetHead(new Node()))
// 将头尾都指向了这个初始化的Node 然后再次进入循环
// 下次进入循环就有了头节点
tail = head;
} else {
//将当前线程的上一个节点指向 之前的最后一个节点
node.prev = t;
// 基于CAS的方式,将tail节点设置为当前节点 即 把当前节点设置为节点
if (compareAndSetTail(t, node)) {
//然后把之前尾节点的下一个节点指向当前节点(也就是现在的尾节点)
t.next = node;
return t;
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 当前节点入队后执行 acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前节点的上一个节点
final Node p = node.predecessor();
//上个节点是否头节点,是否获取到锁
if (p == head && tryAcquire(arg)) {
//设置当前节点为头节点
setHead(node);
//清空父节点的下一个节点 即清除引诱用关系,让父节点可以被回收
p.next = null; // help GC
failed = false;
return interrupted;
}
//获取锁失败,则考虑去park
//shouldParkAfterFailedAcquire(p, node) 1.设置上一个节点的waitStatus = -1
if (shouldParkAfterFailedAcquire(p, node) &&
//基于Unsafe类的park方法,线程挂起 阻塞当前线程 park 可以识别中断标记 可以被唤醒
parkAndCheckInterrupt()) // 针对fail属性,这里是唯一可能出现异常的地方,JVM内部出现问题时,可以这么理解,fianlly代码块中的内容,执行的几率约等于0~
interrupted = true;
}
} finally {
if (failed)
//程序异常出队
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
// 把当前节点的上一个节点指向 上一个节点的上一个,表示剔除 当前节点的上个节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//将上一个节点的再上一个节点 的 下一个节点 指向 当前节点
// 上一个节点.上一个节点.下一个节点 = 当前节点
pred.next = node;
} else {
//cas 设置上一个节点 waitStatus 为-1 表示上一个节点可以被唤醒
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//线程挂起
LockSupport.park(this);
//清除中断标志,清除完中断标识,后续需要再重新复位( selfInterrupt();) 因为后续可能还会有中断唤醒的操作
return Thread.interrupted();
}
//程序执行异常 执行出队逻辑
private void cancelAcquire(Node node) {
if (node == null)
return;
//清空线程信息
node.thread = null;
// 获取上一个节点
Node pred = node.prev;
while (pred.waitStatus > 0)
//往上寻找非失效节点 当前节点的上一个节点赋值为未失效节点
node.prev = pred = pred.prev;
// 获取上一个节点的下一个 节点
Node predNext = pred.next;
// 将当前节点设置为失效节点 状态1
node.waitStatus = Node.CANCELLED;
//如果当前节点是尾节点,将尾节点设置为最近的有效节点(如果当前节点为尾节点的操作)
if (node == tail && compareAndSetTail(node, pred)) {
// 然后再尾节点的下一个节点设置为null
compareAndSetNext(pred, predNext, null);
} else {
// 中间节点操作
// 如果上一个节点不是头节点
int ws;
if (pred != head &&
//获取上一节点状态,是不是有效
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);// 尝试将pred的前驱节点的next指向当前节点的next(必须是有效的next节点)
} else {
// 头结点,唤醒后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
private void setHead(Node node) {
//如果加锁成功,则把当前节点设置为 头节点
head = node;
// 把线程设置为null
node.thread = null;
// 把上一个节点设置为空 帮助GC 上一个节点
node.prev = null;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# 总结:
- ReentrantLock 的实现分为 公平 和 非公平的实现,源码差异是公平锁直接入队、阻塞;非公平锁先通过CAS尝试获取锁,获取失败再入队阻塞。
- 入队之前会再次尝试获取锁,并校验是否存在锁重入,尝试加锁失败执行第3点
- 入队时,如果已经存在尾节点,则直接在队尾入队,如果不存在(尾节点不存在说明没有队列),先创建头节点,再入队
- 入队之后,会判断上个节点是否头节点,如果是的话,再次尝试加锁。加锁成功则 将当前节点设置为头节点,并清空上一个头节点的指针,帮助GC,如果失败则实现第5步
- 先设置上一个节点 waitStatus = -1 表示当前节点可以被上一个节点唤醒,然后park阻塞
# 解锁源码
LockSupport.unpark(s.thread);唤醒下一个节点所在的线程
public void unlock() {
//state - 1 因为存在重入锁的情况
sync.release(1);
}
2
3
4
# release
public final boolean release(int arg) {
//尝试解锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//如果头节点状态不为0,则unpark唤醒其他线程
unparkSuccessor(h);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
# tryRelease(arg)
protected final boolean tryRelease(int releases) {
//当前数量-1 可能存在重入情况
int c = getState() - releases;
//独占线程是否是当前线程,如果不是抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//如果 c==0 表示 当前线程的锁都已经释放,情况独占线程
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
//头节点的等待状态是否小于0, 是的话改成0
compareAndSetWaitStatus(node, ws, 0);
//获取下一个节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 如果下一个节点为null 或者 线程的状态被取消, 则从尾节点开始往上找未取消的节点
s = null;
//因为当前节点为空,没办法s.next 会空指针 所以只能从tail 往前找
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//如果下一个节点不会空,则唤醒下一个节点
LockSupport.unpark(s.thread);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 总结
- 解锁之前 校验state -1 是否等于0,如果不是说明存在锁重入,再次等待解锁,如果是,则进入解锁流程。
- 解锁时,设置exclusiveOwnerThread = null 释放线程持有标识
- 确认waitStatus是否不等于0,如果是则唤醒后续节点。存在2种情况
- waitStatus<0,直接取unpark下一个节点
- 下一个节点为空或者waitStatus 大于0 ,则通过尾节点向上找到最靠近头部的有效节点去unpark