悦书阁 悦书阁
首页
学习笔记
技术文档
idea插件开发
更多
  • 分类
  • 标签
  • 归档

Felix

大道至简 知易行难
首页
学习笔记
技术文档
idea插件开发
更多
  • 分类
  • 标签
  • 归档
  • JVM

  • spring

  • 并发编程

    • 深入理解JMM和并发三大特性
    • CPU缓存一致性协议MESI
    • 深入理解synchronized
    • AQS之独占锁ReentrantLock
    • 深入理解AQS之Semaphorer&CountDownLatch
    • 深入理解AQS之CyclicBarrier
    • 深入理解AQS之ReentrantReadWriteLock
    • Collections之Map&List&Set详解
    • 阻塞队列BlockingQueue实战及其原理分析
      • 阻塞队列介绍
        • Queue接口
        • BlockingQueue接口
        • BlockingQueue方法总结
        • BlockingQueue使用例子
      • 阻塞队列特性
        • 阻塞
        • take 方法
        • put 方法
        • 是否有界
      • 应用场景
      • 常见阻塞队列
        • ArrayBlockingQueue
        • 概念整理
        • 示例
        • 源码分析
        • 入队put方法 源码分析
        • 出队take方法源码分析
        • LinkedBlockingQueue
        • 概念整理
        • 示例
        • 构造函数
        • 数据结构
        • 入队put方法 源码分析
        • 出队take方法 源码分析
        • LinkedBlockingQueue与ArrayBlockingQueue对比
        • SynchronousQueue
        • 概念整理
        • PriorityBlockingQueue
        • 概念整理
        • 示例
        • 如何构造优先级队列
        • 使用普通线性数组(无序)来表示优先级队列
        • 使用一个按顺序排列的有序向量实现优先级队列
        • 二叉堆
        • DelayQueue
        • 概念整理
        • 源码分析
        • 数据结构
        • 入队put方法
        • 出队take方法
    • 深入理解Java线程
    • Executor线程池原理与源码解读
    • 并发编程之定时任务&定时线程池
  • 消息中间件

  • 微服务

  • 三高架构

  • 学习笔记
  • 并发编程
liufei379
2022-08-18
目录

阻塞队列BlockingQueue实战及其原理分析

# 阻塞队列介绍

# Queue接口

public interface Queue<E> extends Collection<E> {
    //添加一个元素,添加成功返回true, 如果队列满了,就会抛出异常
    boolean add(E e);
    //添加一个元素,添加成功返回true, 如果队列满了,返回false
    boolean offer(E e);
    //返回并删除队首元素,队列为空则抛出异常
    E remove();
    //返回并删除队首元素,队列为空则返回null
    E poll();
    //返回队首元素,但不移除,队列为空则抛出异常
    E element();
    //获取队首元素,但不移除,队列为空则返回null
    E peek();
1
2
3
4
5
6
7
8
9
10
11
12
13

# BlockingQueue接口

BlockingQueue 继承了 Queue 接口,是队列的一种。Queue 和 BlockingQueue 都是在 Java 5 中加入的。阻塞队列(BlockingQueue)是一个在队列基础上又支持了两个附加操作的队列,常用解耦。两个附加操作:

  • 支持阻塞的插入方法put: 队列满时,队列会阻塞插入元素的线程,直到队列不满。
  • 支持阻塞的移除方法take: 队列空时,获取元素的线程会等待队列变为非空。

BlockingQueue和JDK集合包中的Queue接口兼容,同时在其基础上增加了阻塞功能。

入队:

(1)offer(E e):如果队列没满,返回true,如果队列已满,返回false(不阻塞)

(2)offer(E e, long timeout, TimeUnit unit):可以设置阻塞时间,如果队列已满,则进行阻塞。超过阻塞时间,则返回false

(3)put(E e):队列没满的时候是正常的插入,如果队列已满,则阻塞,直至队列空出位置

出队:

(1)poll():如果有数据,出队,如果没有数据,返回null (不阻塞)

(2)poll(long timeout, TimeUnit unit):可以设置阻塞时间,如果没有数据,则阻塞,超过阻塞时间,则返回null

(3)take():队列里有数据会正常取出数据并删除;但是如果队列里无数据,则阻塞,直到队列里有数据

# BlockingQueue方法总结

当队列满了无法添加元素,或者是队列空了无法移除元素时:

  • 抛出异常:add、remove、element

  • 返回结果但不抛出异常:offer、poll、peek

  • 阻塞:put、take

方法 抛出异常 返回特定值 阻塞 阻塞特定时间
入队 add(e) offer(e) put(e) offer(e, time, unit)
出队 remove() poll() take() poll(time, unit)
获取队首元素 element() peek() 不支持 不支持

# BlockingQueue使用例子

public class BlockingQueueTest {

    public static void main(String[] args) {

        //抛出异常
        addTest();// 入队尾元素 队列满抛出异常
        removeTest();//出队首元素 队列空抛出异常
        elementTest();//返回首元素 不移除 队列空抛出异常

        //返回数据
        offerTest(); //入队尾元素 队列空返回false
        pollTest(); //出队首元素 队列空则返回null
        peekTest(); //获取队首元素,但不移除,队列为空则返回null

        //阻塞
        putTest(); //入队尾元素 队列满阻塞
        takeTest();//出队首元素 队列空阻塞
    }

    /**
     * add 方法是往队列里添加一个元素,如果队列满了,就会抛出异常来提示队列已满。
     */
    private static void addTest() {
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
        System.out.println(blockingQueue.add(1));
        System.out.println(blockingQueue.add(2));
        System.out.println(blockingQueue.add(3));
    }

    /**
     * remove 方法的作用是删除元素并返回队列的头节点,如果删除的队列是空的, remove 方法就会抛出异常。
     */
    private static void removeTest() {
        ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
        blockingQueue.add(1);
        blockingQueue.add(2);
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
    }

    /**
     * element 方法是返回队列的头部节点,但是并不删除。如果队列为空,抛出异常
     */
    private static void elementTest() {
        ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
        blockingQueue.element();
    }

    /**
     * offer 方法用来插入一个元素。如果添加成功会返回 true,而如果队列已经满了,返回false
     */
    private static void offerTest(){
        ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
        System.out.println(blockingQueue.offer(1));
        System.out.println(blockingQueue.offer(2));
        System.out.println(blockingQueue.offer(3));
    }

    /**
     * poll 方法作用也是移除并返回队列的头节点。 如果队列为空,返回null
     */
    private static void pollTest() {
        ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(3);
        blockingQueue.offer(1);
        blockingQueue.offer(2);
        blockingQueue.offer(3);
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
    }

    /**
     * peek 方法返回队列的头元素但并不删除。 如果队列为空,返回null
     */
    private static void peekTest() {
        ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
        System.out.println(blockingQueue.peek());
    }

    /**
     * put 方法的作用是插入元素。如果队列已满就无法继续插入,阻塞插入线程,直至队列空出位置
     */
    private static void putTest(){
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
        try {
            blockingQueue.put(1);
            blockingQueue.put(2);
            blockingQueue.put(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * take 方法的作用是获取并移除队列的头结点。如果执队列里无数据,则阻塞,直到队列里有数据
     */
    private static void takeTest(){
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
        try {
            blockingQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107

# 阻塞队列特性

# 阻塞

阻塞队列区别于其他类型的队列的最主要的特点就是“阻塞”这两个字,所以下面重点介绍阻塞功能:阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。实现阻塞最重要的两个方法是 take 方法和 put 方法。

# take 方法

take 方法的功能是获取并移除队列的头结点,通常在队列里有数据的时候是可以正常移除的。可是一旦执行 take 方法的时候,队列里无数据,则阻塞,直到队列里有数据。一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。过程如图所示:

image-20220818113623110

# put 方法

put 方法插入元素时,如果队列没有满,那就和普通的插入一样是正常的插入,但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间。如果后续队列有了空闲空间,比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到队列中。过程如图所示:

image-20220818114920874

# 是否有界

阻塞队列还有一个非常重要的属性,那就是容量的大小,分为有界和无界两种。无界队列意味着里面可以容纳非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。但是有的阻塞队列是有界的,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。

# 应用场景

BlockingQueue 是线程安全的,我们在很多场景下都可以利用线程安全的队列来优雅地解决我们业务自身的线程安全问题。比如说,使用生产者/消费者模式的时候,我们生产者只需要往队列里添加元素,而消费者只需要从队列里取出它们就可以了

image-20220818140814696

因为阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的,不会发生线程安全问题。生产者/消费者直接使用线程安全的队列就可以,而不需要自己去考虑更多的线程安全问题。这也就意味着,考虑锁等线程安全问题的重任从“你”转移到了“队列”上,降低了我们开发的难度和工作量。

同时,队列它还能起到一个隔离的作用。比如说我们开发一个银行转账的程序,那么生产者线程不需要关心具体的转账逻辑,只需要把转账任务,如账户和金额等信息放到队列中就可以,而不需要去关心银行这个类如何实现具体的转账业务。而作为银行这个类来讲,它会去从队列里取出来将要执行的具体的任务,再去通过自己的各种方法来完成本次转账。这样就实现了具体任务与执行任务类之间的解耦,任务被放在了阻塞队列中,而负责放任务的线程是无法直接访问到我们银行具体实现转账操作的对象的,实现了隔离,提高了安全性。

# 常见阻塞队列

BlockingQueue 接口的实现类都被放在了 juc 包中,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。

队列 描述
ArrayBlockingQueue 基于数组结构实现的一个有界阻塞队列
LinkedBlockingQueue 基于链表结构实现的一个无界阻塞队列,指定容量为有界阻塞队列
PriorityBlockingQueue 支持按优先级排序的无界阻塞队列
DelayQueue 基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列
SynchronousQueue 不存储元素的阻塞队列
LinkedTransferQueue 基于链表结构实现的一个无界阻塞队列
LinkedBlockingDeque 基于链表结构实现的一个双端阻塞队列

# ArrayBlockingQueue

# 概念整理

ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全。

在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞。

使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。

  • 有界阻塞队列,先进先出,存取相互排斥
  • 数据结构:静态数组
    • 容量固定必须指定长度,没有扩容机制
    • 没有元素的位置也占用空间,被 null 占位
  • 锁:ReentrantLock 存取是同一把锁,操作的是同一个数组对象,存取互相排斥(高并发场景有性能问题)
  • 阻塞对象
    • notEmpty 出队:队列count=0,无元素可取时,阻塞在该对象上(即空时阻塞)
    • notFull 入队:队列count=length,放不进去元素时,阻塞在该对象上(即满时阻塞)
  • 入队 从队首开始添加元素,记录putIndex(到队尾时设置为0) 唤醒notEmpty
  • 出队 从队首开始取出元素,记录takeIndex(到队尾时设置为0)唤醒notFull
  • 两个指针都是从队首向队尾移动,保证队列的先进先出原则

# 示例

@Slf4j
public class ArrayBlockingQueueDemo {

    public static void main(String[] args) throws Exception {
        BlockingQueue queue = new ArrayBlockingQueue(2);
		
        //如果只开启生产者线程,则当i == 1时 阻塞
        new Thread(()->{
            for (int i = 0; i < 100; i++) {
                try {
                    queue.put(i);
                    log.info("put:"+i);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();  //开启生产者线程
        
         //如果只开消费者线程,则当i == 1时 阻塞
        new Thread(()->{

            for (int i = 0; i < 100; i++) {
                try {
                    log.info("take:"+queue.take().toString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();  //开启消费者线程
        //当2个都开启时,则交替执行
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 源码分析

利用了Lock锁的Condition通知机制进行阻塞控制。 一把锁,两个条件

//数据元素数组
final Object[] items;
//下一个待取出元素索引
int takeIndex;
//下一个待添加元素索引
int putIndex;
//元素个数
int count;
//内部锁
final ReentrantLock lock;
//消费者
private final Condition notEmpty;
//生产者
private final Condition notFull;  

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
    ...
    lock = new ReentrantLock(fair); //公平,非公平
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 入队put方法 源码分析
//put可中断
public void put(E e) throws InterruptedException {
	//非空校验 如果put(null) 则空指针异常
    checkNotNull(e);
    // 和take同意把锁
    final ReentrantLock lock = this.lock;
    //加锁,如果线程中断抛出异常 
    lock.lockInterruptibly();
    try {
       //阻塞队列已满,则将生产者挂起,等待消费者唤醒
       //设计注意点: 用while不用if是为了防止虚假唤醒(即如果已经有thread A 和 thread B 2个线程阻塞在这里,当被唤醒时,如果是if,则2个对象都会进入数组,但是实际数组只够存放一个对象。导致其他非空数组对象被覆盖,但是用while,则唤醒之后 thread A 所持有的对象入队,再去唤醒thread B,如果此时count == items.length,theread B 将再次阻塞) 
        while (count == items.length)
            notFull.await(); //队列满了,使用notFull等待(生产者阻塞)
        // 入队
        enqueue(e);
    } finally {
        lock.unlock(); // 唤醒消费者线程
    }
}
    
private void enqueue(E x) {
    final Object[] items = this.items;
    //入队   使用的putIndex
    items[putIndex] = x;
    if (++putIndex == items.length) 
        putIndex = 0;  //设计的精髓: 环形数组,putIndex指针到数组尽头了,返回头部
    count++;
    //notEmpty条件队列转同步队列,准备唤醒消费者线程,因为入队了一个元素,肯定不为空了
    notEmpty.signal();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 出队take方法源码分析
//take 可中断
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    //加锁,如果线程中断抛出异常 
    lock.lockInterruptibly();
    try {
       //如果队列为空,则消费者挂起 等待生产者唤醒
        while (count == 0)
            notEmpty.await();
        //出队
        return dequeue();
    } finally {
        lock.unlock();// 唤醒生产者线程
    }
}
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex]; //取出takeIndex位置的元素
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0; //设计的精髓: 环形数组,takeIndex 指针到数组尽头了,返回头部
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //notFull条件队列转同步队列,准备唤醒生产者线程,此时队列有空位
    notFull.signal();
    return x;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# LinkedBlockingQueue

# 概念整理

​ LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。

​ LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。

  • 无界阻塞队列,可以指定容量,默认为 Integer.MAX_VALUE,先进先出,存取互不干扰
  • 数据结构:链表 内部类 Node 存储元素
  • 锁分离:存取互不干扰,存取操作的是不同的Node对象
    • takeLock 取Node节点保证前驱后继不会乱
    • putLock 存Node节点保证前驱后继不会乱
  • 阻塞对象
    • notEmpty 出队:队列count=0,无元素可取时,阻塞在该对象上
    • notFull 入队:队列count=capacity,放不进去元素时,阻塞在该对象上
  • 入队: 队尾入队,由last指针记录
  • 出队: 队首出队,由head指针记录

# 示例

参照ArrayBlockingQueue示例

# 构造函数

//指定队列的大小创建有界队列
BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(100);
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 初始化head和last指针为空值节点
    last = head = new Node<E>(null);
}

//无界队列
BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>();
public LinkedBlockingQueue() {
    // 如果没传容量,就使用最大int值初始化其容量
    this(Integer.MAX_VALUE);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 数据结构

// 容量,指定容量就是有界队列
private final int capacity;
// 元素数量
private final AtomicInteger count = new AtomicInteger();
// 链表头  本身是不存储任何元素的,初始化时item指向null
transient Node<E> head;
// 链表尾
private transient Node<E> last;
// take锁   锁分离,提高效率
private final ReentrantLock takeLock = new ReentrantLock();
// notEmpty条件
// 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒
private final Condition notEmpty = takeLock.newCondition();
// put锁
private final ReentrantLock putLock = new ReentrantLock();
// notFull条件
// 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
private final Condition notFull = putLock.newCondition();

//典型的单链表结构
static class Node<E> {
    E item;  //存储元素
    Node<E> next;  //后继节点    单链表结构
    Node(E x) { item = x; }
} 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 入队put方法 源码分析

public void put(E e) throws InterruptedException {    
    // 不允许null元素
    if (e == null) throw new NullPointerException();
    int c = -1;
    // 新建一个节点
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 使用put锁加锁
    putLock.lockInterruptibly();
    try {
        // 如果队列满了,就阻塞在notFull上等待被其它线程唤醒(阻塞生产者线程)
        while (count.get() == capacity) {
            notFull.await();
        }  
        // 队列不满,就入队
        enqueue(node);
        c = count.getAndIncrement();// 队列长度加1,返回原值
        // 如果现队列长度小于容量,notFull条件队列转同步队列,准备唤醒一个阻塞在notFull条件上的线程(可以继续入队) 
        // 这里为啥要唤醒一下呢?
        // 因为可能有很多线程阻塞在notFull这个条件上,而取元素时只有取之前队列是满的才会唤醒notFull,此处不用等到取元素时才唤醒
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock(); // 真正唤醒生产者线程
    }  
    // c = count.getAndIncrement(); 返回的是原值,如果原值是0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程去take
    if (c == 0)
        signalNotEmpty();
}
private void enqueue(Node<E> node) { 
    // 直接加到last后面,last指向入队元素
    last = last.next = node;
}    
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock; 
    takeLock.lock();// 加take锁
    try {  
        notEmpty.signal();// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程
    } finally {
        takeLock.unlock();  // 真正唤醒消费者线程
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# 出队take方法 源码分析

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 使用takeLock加锁
    takeLock.lockInterruptibly();
    try {
        // 如果队列无元素,则阻塞在notEmpty条件上(消费者线程阻塞)
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 否则,出队
        x = dequeue();
        c = count.getAndDecrement();//长度-1,返回原值
        if (c > 1)// 如果取之前队列长度大于1,notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程,原因与入队同理
            notEmpty.signal();
    } finally {
        takeLock.unlock(); // 真正唤醒消费者线程
    }
    // 为什么队列是满的才唤醒阻塞在notFull上的线程呢?
    // 因为唤醒是需要加putLock的,这是为了减少锁的次数,所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程,
    // 这也是锁分离带来的代价
    // 如果取之前队列长度等于容量(已满),则唤醒阻塞在notFull的线程
    // c = count.getAndDecrement(); 返回原值,如果原长度 = 数组长度,现在取了一个表示又有空位了,立即唤醒阻塞在notFull上的线程去put
    if (c == capacity)
        signalNotFull();
    return x;
}
private E dequeue() {
     // head节点本身是不存储任何元素的
    // 这里把head删除,并把head下一个节点作为新的值
    // 并把其值置空,返回原来的值
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // 方便GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();// notFull条件队列转同步队列,准备唤醒阻塞在notFull上的线程
    } finally {
        putLock.unlock(); // 解锁,这才会真正的唤醒生产者线程
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# LinkedBlockingQueue与ArrayBlockingQueue对比

LinkedBlockingQueue的入队和出队是两把锁,存取元素互不干扰。ArrayBlockingQueue则是使用的同一把锁,存取元素时相互排斥。所以LinkedBlockingQueue性能会更好一些。

LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。它和ArrayBlockingQueue的不同点在于:

  • 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
  • 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
  • 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
  • 两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

# SynchronousQueue

# 概念整理

SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take。

image-20220819115034300

如图所示,SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。

需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。

提示

生产者或者消费者必须有一方是阻塞的,否则消费者将无法获取数据

  • 是一个没有数据缓冲的BlockingQueue,容量为0,它不会为队列中元素维护存储空间,它只是多个线程之间数据交换的媒介。
  • 数据结构:链表(Node)
  • 锁:CAS+自旋(无锁)
  • 存取调用同一个方法:transfer()
    • put、offer 为生产者,携带了数据 e,为 Data 模式,设置到 SNode或QNode 属性中
    • take、poll 为消费者,不携帯数据,为 Request 模式,设置到 SNode或QNode属性中
  • 过程
    • 线程访问阻塞队列,先判断队尾节点或者栈顶节点的 Node 与当前入队模式是否相同
    • 相同则构造节点 Node 入队,并阻塞当前线程,元素 e 和线程赋值给 Node 属性
    • 不同则将元素 e(不为 null) 返回给取数据线程,队首或栈顶线程被唤醒,出队
  • 公平模式
    • TransferQueue 队尾匹配(判断模式),队头出队,先进先出
  • 非公平模式(默认策略)
    • TransferStack 栈顶匹配,栈顶出栈,后进先出
  • 应用场景
    • SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。
    • SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

# PriorityBlockingQueue

# 概念整理

PriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列,数组的默认长度是11,虽然指定了数组的长度,但是可以无限的扩充,直到资源消耗尽为止,每次出队都返回优先级别最高的或者最低的元素。默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序。

优先级队列PriorityQueue: 队列中每个元素都有一个优先级,出队的时候,优先级最高的先出。

  • 一个支持优先级排序的无界阻塞队列 优先级高的先出队,优先级低的后出队
  • 数据结构:数组+二叉堆 默认容量11,可指定初始容量,会自动扩容,最大容量是(Integer.MAX_VALUE - 8)
  • 锁:ReentrantLock 存取是同一把锁
  • 阻塞对象:NotEmpty 出队,队列为空时阻塞
  • 入队不阻塞,永远返回成功,无界
    • 根据比较器进行堆化(排序)自下而上
      • 传入比较器对象就按照比较器的顺序排序
      • 如果比较器为 null,则按照自然顺序排序
  • 出队
    • 优先级最高的元素在堆顶(弹出堆顶元素) 弹出前比较两个子节点再进行堆化(自上而下)
  • 应用场景
    • 业务办理排队叫号,VIP客户插队
    • 电商抢购活动,会员级别高的用户优先抢购到商品

# 示例

//创建优先级阻塞队列  Comparator为null,自然排序
PriorityBlockingQueue<Integer> queue=new PriorityBlockingQueue<Integer>(5);
//自定义Comparator
PriorityBlockingQueue queue=new PriorityBlockingQueue<Integer>(
        5, new Comparator<Integer>() {
    @Override
    public int compare(Integer o1, Integer o2) {
        return o2-o1;
    }
1
2
3
4
5
6
7
8
9

# 如何构造优先级队列

# 使用普通线性数组(无序)来表示优先级队列

image-20220819153450817

  • 执行插入操作时,直接将元素插入到数组末端,需要的成本为O(1),
  • 获取优先级最高元素,我们需要遍历整个线性队列,匹配出优先级最高元素,需要的成本为o(n)
  • 删除优先级最高元素,我们需要两个步骤,第一找出优先级最高元素,第二步删除优先级最高元素,然后将后面的元素依次迁移,填补空缺,需要的成本为O(n)+O(n)=O(n)
# 使用一个按顺序排列的有序向量实现优先级队列

image-20220819153500209

  • 获取优先级最高元素,O(1)
  • 删除优先级最高元素,O(1)
  • 插入一个元素,需要两个步骤,第一步我们需要找出要插的位置,这里我们可以使用二分查找,成本为O(logn),第二步是插入元素之后,将其所有后继进行后移操作,成本为O(n),所有总成本为O(logn)+O(n)=O(n)
# 二叉堆

完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序。

二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,二叉堆又可以分为两个类型:

大顶堆和小顶堆。

  • 大顶堆(最大堆):父结点的键值总是大于或等于任何一个子节点的键值;
  • 小顶堆(最小堆):父结点的键值总是小于或等于任何一个子节点的键值。

​ image-20220819153519225

# DelayQueue

# 概念整理

​ DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。

  • 一个使用优先级队列实现的无界阻塞队列
  • 数据结构:PriorityQueue 与PriorityBlockingQueue类似,不过没有阻塞功能
  • 锁:ReentrantLock
  • 阻塞对象:Condition available
  • 入队:不阻塞,无界队列,与优先级队列入队相同,available
  • 出队 为空时阻塞
    • 检查堆顶元素过期时间 小于等于0则出队
    • 大于0,说明没过期,则阻塞
      • 判断leader线程是否为空(为了保证优先级)
      • 不为空(已有线程阻塞),直接阻塞
      • 为空,则将当前线程置为leader,并按照过期时间进行阻塞
  • 应用场景
    • 商城订单超时关闭 下单之后如果三十分钟之内没有付款就自动取消订单
    • 短信通知功能 下单成功后60s之后给用户发送短信通知。

​ 它是无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接口,所以自然就拥有了比较和排序的能力,代码如下:

public interface Delayed extends Comparable<Delayed> {
    //getDelay 方法返回的是“还剩下多长的延迟时间才会被执行”,
    //如果返回 0 或者负数则代表任务已过期。
    //元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。	
    long getDelay(TimeUnit unit);
}
1
2
3
4
5
6

# 源码分析

# 数据结构
//用于保证队列操作的线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列,存储元素,用于保证延迟低的优先执行
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻塞的线程
private Thread leader = null;
// 条件,用于表示现在是否有可取的元素   当新元素到达,或新线程可能需要成为leader时被通知
private final Condition available = lock.newCondition();

public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
1
2
3
4
5
6
7
8
9
10
11
12
# 入队put方法
public void put(E e) {
    offer(e);
}
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 入队
        q.offer(e);
        if (q.peek() == e) {
            // 若入队的元素位于队列头部,说明当前元素延迟最小
            // 将 leader 置空
            leader = null;
            // available条件队列转同步队列,准备唤醒阻塞在available上的线程
            available.signal();
        }
        return true;
    } finally {
        lock.unlock(); // 解锁,真正唤醒阻塞的线程
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 出队take方法
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();// 取出堆顶元素( 最早过期的元素,但是不弹出对象)   
            if (first == null)// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
                available.await();//当前线程无限期等待,直到被唤醒,并且释放锁。
            else {
                long delay = first.getDelay(NANOSECONDS);// 堆顶元素的到期时间             
                if (delay <= 0)// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
                    return q.poll();
                
                // 如果delay大于0 ,则下面要阻塞了
                // 将first置为空方便gc
                first = null; 
                // 如果有线程争抢的Leader线程,则进行无限期等待。
                if (leader != null)
                    available.await();
                else {
                    // 如果leader为null,把当前线程赋值给它
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待剩余等待时间
                        available.awaitNanos(delay);
                    } finally {
                        // 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
        if (leader == null && q.peek() != null)
            // available条件队列转同步队列,准备唤醒阻塞在available上的线程
            available.signal();
        // 解锁,真正唤醒阻塞的线程
        lock.unlock();
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  1. 当获取元素时,先获取到锁对象。
  2. 获取最早过期的元素,但是并不从队列中弹出元素。
  3. 最早过期元素是否为空,如果为空则直接让当前线程无限期等待状态,并且让出当前锁对象。
  4. 如果最早过期的元素不为空
  5. 获取最早过期元素的剩余过期时间,如果已经过期则直接返回当前元素
  6. 如果没有过期,也就是说剩余时间还存在,则先获取Leader对象,如果Leader已经有线程在处理,则当前线程进行无限期等待,如果Leader为空,则首先将Leader设置为当前线程,并且让当前线程等待剩余时间。
  7. 最后将Leader线程设置为空
  8. 如果Leader已经为空,并且队列有内容则唤醒一个等待的队列。
上次更新: 2026/3/11 22:17:56
Collections之Map&List&Set详解
深入理解Java线程

← Collections之Map&List&Set详解 深入理解Java线程→

最近更新
01
实现idea开发的关键步骤
10-05
02
Redis高可用架构
09-09
03
Zookeeper高可用
08-31
更多文章>
Theme by Vdoing | Copyright © 2022-2026 Felix | 粤ICP备17101757号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式