悦书阁 悦书阁
首页
学习笔记
技术文档
idea插件开发
更多
  • 分类
  • 标签
  • 归档

Felix

大道至简 知易行难
首页
学习笔记
技术文档
idea插件开发
更多
  • 分类
  • 标签
  • 归档
  • JVM

  • spring

  • 并发编程

    • 深入理解JMM和并发三大特性
    • CPU缓存一致性协议MESI
    • 深入理解synchronized
    • AQS之独占锁ReentrantLock
    • 深入理解AQS之Semaphorer&CountDownLatch
    • 深入理解AQS之CyclicBarrier
    • 深入理解AQS之ReentrantReadWriteLock
      • 前言
      • 读写锁介绍
      • ReentrantReadWriteLock的使用
        • ReentrantReadWriteLock结构
        • 读写状态的设计
        • HoldCounter 计数器
        • 读多写少场景(缓存类)
        • 锁降级场景
        • 为什么要锁降级?
      • 源码分析
        • 写锁源码分析(独占锁)
        • 加锁
        • 解锁源码
        • 写锁加锁流程图
        • 写锁解锁流程图
        • 读锁源码分析(共享锁)
        • 读锁加锁
        • 读锁解锁
        • 读锁加锁流程
        • 读锁解锁流程
    • Collections之Map&List&Set详解
    • 阻塞队列BlockingQueue实战及其原理分析
    • 深入理解Java线程
    • Executor线程池原理与源码解读
    • 并发编程之定时任务&定时线程池
  • 消息中间件

  • 微服务

  • 三高架构

  • 学习笔记
  • 并发编程
liufei379
2022-08-03
目录

深入理解AQS之ReentrantReadWriteLock

# 前言

前几节我们学习了ReentrantLock、Semaphorer&CountDownLatch

  • ReentrantLock 实现了独占锁(互斥锁)
  • Semaphorer&CountDownLatch 实现了共享锁

本章节我们将学习ReentrantReadWriteLock 读写锁,读锁实现共享锁,写锁实现独占锁

# 读写锁介绍

  • 应用场景:读多写少(注册中心nacos)
  • 读读可并发,读写/写读/写写互斥,所以读写锁能够提供比排它锁更好的并发性和吞吐量。
  • 线程进入读锁的前提条件
    • 没有其他线程的写锁
    • 没有写请求或者有写请求,但调用线程和持有锁的线程是同一个。
  • 三个特性:
    • 公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
    • 可重入:读锁和写锁都支持线程重入。以读写线程为例:读线程获取读锁后,能够再次获取读锁。写线程在获取写锁之后能够再次获取写锁,同时也可以获取读锁。
    • 锁降级:同一个线程遵循获取写锁 --> 获取读锁 --> 释放写锁 --> 释放读锁 的方式,写锁能够降级成为读锁。
      • 如果先持有读锁 再去持有写锁 ,写锁会入队 park 从而导致线程阻塞死锁
  • 避免线程饥饿
    • 读锁获取都需要校验s.isShared() ,如果不是共享说明有写锁,那么入队等待死锁执行,避免无法写入,导致线程饥饿。

# ReentrantReadWriteLock的使用

# ReentrantReadWriteLock结构

image-20220803112758744 image-20220803112819061

# 读写状态的设计

在 ReentrantLock 中,使用 Sync ( 实际是 AQS )的 int 类型的 state 来表示同步状态,表示锁被一个线程重复获取的次数。但是,读写锁 ReentrantReadWriteLock 内部维护着一对读写锁,如果要用一个变量维护多种状态,需要采用“按位切割使用”的方式来维护这个变量,将其切分为两部分:高16为表示读,低16为表示写。

分割之后,读写锁是如何迅速确定读锁和写锁的状态呢?通过位运算。假如当前同步状态为S,那么:

  • 写状态,等于 S & 0x0000FFFF(将高 16 位全部抹去)。 当写状态加1,等于S+1.
  • 读状态,等于 S >>> 16 (无符号补 0 右移 16 位)。当读状态加1,等于S+(1<<16),也就是S+0x00010000

根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读状态(S>>>16)大于0,即读锁已被获取。

image-20220803113007290

代码实现:java.util.concurrent.locks.ReentrantReadWriteLock.Sync

image-20220803113036700

  • exclusiveCount(int c) 静态方法,获得持有写状态的锁的次数。
  • sharedCount(int c) 静态方法,获得持有读状态的锁的线程数量。不同于写锁,读锁可以同时被多个线程持有。而每个线程持有的读锁支持重入的特性,所以需要对每个线程持有的读锁的数量单独计数,这就需要用到 HoldCounter 计数器
# HoldCounter 计数器

读锁的内在机制其实就是一个共享锁。一次共享锁的操作就相当于对HoldCounter 计数器的操作。获取共享锁,则该计数器 + 1,释放共享锁,该计数器 - 1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。image-20220803113128926

通过 ThreadLocalHoldCounter 类,HoldCounter 与线程进行绑定。HoldCounter 是绑定线程的一个计数器,而 ThreadLocalHoldCounter 则是线程绑定的 ThreadLocal。

  • HoldCounter是用来记录读锁重入数的对象
  • ThreadLocalHoldCounter是ThreadLocal变量,用来存放不是第一个获取读锁的线程的其他线程的读锁重入数对象

# 读多写少场景(缓存类)

public class Cache {
    static Map<String, Object> map = new HashMap<String, Object>();
    static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    static Lock r = rwl.readLock();
    static Lock w = rwl.writeLock();

    public static final Object get(String key) {
        r.lock();
        try {
            return map.get(key);
        } finally {
            r.unlock();
        }
    }

    public static final Object put(String key, Object value) {
        w.lock();
        try {
            return map.put(key, value);
        } finally {
            w.unlock();
        }
    }

    public static final void clear() {
        w.lock();
        try {
            map.clear();
        } finally {
            w.unlock();
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
  • Cache组合一个非线程安全的HashMap作为缓存的实现,同时使用读写锁的读锁和写锁来保证Cache是线程安全的
  • 在读操作get(String key)方法中,需要获取读锁,这使得并发访问该方法时不会被阻塞
  • 写操作put(String key,Object value)方法和clear()方法,在更新 HashMap时必须提前获取写锁,当获取写锁后,其他线程对于读锁和写锁的获取均被阻塞,而 只有写锁被释放之后,其他读写操作才能继续
  • Cache使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性,同时简化了编程方式

# 锁降级场景

final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();

try {
    w.lock();
    log.info("获取写锁");
    try {
        r.lock();
        log.info("获取读锁");
    } finally {
        w.unlock();
        log.info("释放写锁");
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    r.unlock();
    log.info("释放读锁");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 为什么要锁降级?

保证数据的可见性

# 源码分析

# 写锁源码分析(独占锁)

# 加锁
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        //加锁失败 入队 阻塞 
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
    //当前线程
    Thread current = Thread.currentThread();
    //获取state状态   存在读锁或者写锁,状态就不为0
    int c = getState();
    //获取写锁的重入数
    int w = exclusiveCount(c);
    //当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁
    if (c != 0) {
        // c!=0 && w==0 表示存在读锁
        // 当前存在读锁或者写锁已经被其他写线程获取,则写锁获取失败
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 超出最大范围  65535
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //锁重入
        setState(c + acquires);
        return true;
    }
    // writerShouldBlock有公平与非公平的实现, 非公平返回false,会尝试通过cas加锁 如果公平则判断是否有队列 hasQueuedPredecessors,如果是则去入队阻塞
    //c==0 写锁未被任何线程获取,当前线程是否阻塞或者cas尝试获取锁 如果失败 入队 阻塞
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;

    //设置写锁为当前线程所有
    setExclusiveOwnerThread(current);
    return true;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 解锁源码
protected final boolean tryRelease(int releases) {
    //若锁的持有者不是当前线程,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    //当前写状态是否为0,为0则释放写锁(存在重入情况)
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        //设置持有线程为空
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
1
2
3
4
5
6
7
8
9
10
11
12
# 写锁加锁流程图

image-20220803114015863

# 写锁解锁流程图

image-20220803114257899

# 读锁源码分析(共享锁)

# 读锁加锁
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    // 如果写锁已经被获取并且获取写锁的线程不是当前线程,当前线程获取读锁失败返回-1   判断锁降级
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    //计算出读锁的数量
    int r = sharedCount(c);
    /**
    * 读锁是否阻塞    readerShouldBlock()公平与非公平的实现
    * r < MAX_COUNT: 持有读锁的线程小于最大数(65535)
    *  compareAndSetState(c, c + SHARED_UNIT) cas设置获取读锁线程的数量
    */
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {   //当前线程获取读锁
        
        if (r == 0) {  //设置第一个获取读锁的线程
            firstReader = current; 
            firstReaderHoldCount = 1;  //设置第一个获取读锁线程的重入数
        } else if (firstReader == current) { // 表示第一个获取读锁的线程重入
            firstReaderHoldCount++;
        } else { // 非第一个获取读锁的线程
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;  //记录其他获取读锁的线程的重入次数
        }
        return 1;
    }
    
    // 尝试通过自旋的方式获取读锁,实现了重入逻辑
    return fullTryAcquireShared(current);
}


final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}    

final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        //如果不是共享 说明存在写锁,则读锁入队 避免饥饿
        !s.isShared()         && 
        s.thread != null;
}
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# 读锁解锁
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //如果当前线程是第一个获取读锁的线程
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--; //重入次数减1
    } else {  //不是第一个获取读锁的线程
        HoldCounter rh = cachedHoldCounter;  
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;  //重入次数减1
    }
    for (;;) {  //cas更新同步状态
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 读锁加锁流程

image-20220803134817326

# 读锁解锁流程

image-20220803134740202

上次更新: 2026/3/11 22:17:56
深入理解AQS之CyclicBarrier
Collections之Map&List&Set详解

← 深入理解AQS之CyclicBarrier Collections之Map&List&Set详解→

最近更新
01
实现idea开发的关键步骤
10-05
02
Redis高可用架构
09-09
03
Zookeeper高可用
08-31
更多文章>
Theme by Vdoing | Copyright © 2022-2026 Felix | 粤ICP备17101757号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式