概述
前文「JDK源码分析-PriorityQueue」分析了优先队列 PriorityQueue,它既不是阻塞队列,而且线程不安全。本文分析线程安全的阻塞优先队列 PriorityBlockingQueue。它的继承结构如下:
PriorityBlockingQueue 与 PriorityQueue 的内部结构类似,也是物理上由数组、逻辑上由堆结构实现的,并且使用 ReentrantLock 实现线程安全。除此之外,二者大部分操作都是类似的。
因此,有了前文的铺垫,这里相对更容易理解一些。下面分析其代码实现。
代码分析
主要成员变量
// 内部数组的默认初始化容量
构造器
// 构造器 1:使用默认的初始化容量创建一个对象
上面几个构造器都是比较简单的赋值。除此之外,还有一个用给定集合初始化的构造器,如下:
public PriorityBlockingQueue(Collection<? extends E> c) {
堆化操作 heapify 代码如下:
private void heapify() {
siftDownUsingComparator 代码如下:
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
该方法与 PriorityQueue 中的 siftDownUsingComparator 方法操作几乎完全一致,可参考前文的分析,这里不再赘述(siftDownComparable 方法亦是如此)。
入队方法:add(E), put(E), offer(E, timeout, TimeUnit), offer(E)
public boolean add(E e) {
上述三个方法内部都是通过 offer(e) 方法实现的,因此只需分析 offer(e) 方法即可:
public boolean offer(E e) {
下面分析一下扩容操作 tryGrow:
private void tryGrow(Object[] array, int oldCap) {
这个扩容方法比较有意思:它刚开始会释放锁,而后再重新获取锁。
1. 为什么刚开始要释放锁?
由于该锁是全局的,其他大部分公有(public)方法也会用到;而扩容操作又相对比较耗时,若这里不释放,则某个线程扩容时其他方法调用可能会阻塞。
2. 释放锁之后如何保证线程安全?
这就用到了成员变量 allocationSpinLock,使用了 Unsafe 类的 CAS 操作。它尝试将 allocationSpinLock 的值设置为 1,而一旦操作成功,其他线程就无法进入,直到该线程将它重置为 0. 这就保证了同一时间内只能有一个线程在扩容。
3. 在释放锁后的扩容操作中,先后可能会有多个线程扩容,也即会产生多个新容量的空数组(此时它们都未指向原先的数组 queue),如何避免老数据多次复制到新数组呢?
代码里用到了 queue == array 这个判断。
比如线程 T1 和 T2 都对原数组进行了扩容,得到了两个 newArray,在后面复制老数据时,若其中一个线程已经对 queue 重新赋值并复制后,由于 queue 已经改变,后面的线程就不会再复制一次了。
出队方法:poll(), take(), peek()
// 出队
可以看到这几个出队的操作都加了锁,内部都调用了 dequeue 方法:
private E dequeue() {
该方法与 PriorityQueue 的出队操作 poll() 类似,也不再赘述。
小结
1. PriorityBlockingQueue 是优先队列的阻塞方式实现,它与 PriorityQueue 内部结构类似,即物理结构是可变数组、逻辑结构是堆;
- PriorityBlockingQueue 内部元素不能为空,且可比较,使用 ReentrantLock 保证线程安全。
参考链接:
https://juejin.im/post/5cc258796fb9a03228616e6e
https://blog.csdn.net/codejas/article/details/89190774
相关阅读:
Stay hungry, stay foolish.
本文分享自微信公众号 - WriteOnRead(WriteOnRead)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。