PriorityBlockingQueue是一个基于优先级堆的无界的并发安全的优先级队列(FIFO),队列的元素按照其自然顺序进行排序,或者根据构造队列时提供的 Comparator 进行排序,具体取决于所使用的构造方法。
实现原理
PriorityBlockingQueue通过使用堆这种数据结构实现将队列中的元素按照某种排序规则进行排序,从而改变先进先出的队列顺序,提供开发者改变队列中元素的顺序的能力。队列中的元素必须是可比较的,即实现Comparable接口,或者在构建函数时提供可对队列元素进行比较的Comparator对象。
结构介绍
PriorityBlockingQueue通过内部组合PriorityQueue的方式实现优先级队列(private final PriorityQueueq;),另外在外层通过ReentrantLock实现线程安全,同时通过Condition实现阻塞唤醒。
常用方法介绍
PriorityBlockingQueue继承自AbstractQueue,以及实现了BlockingQueue接口,是一个阻塞队列,主要方法:offer(E)/poll()/poll(long, TimeUnit)/take()/remove(Object)。下面我们结合源码堆这些方法进行深入分析。
offer(E)
入队操作。此处虽然PriorityBlockingQueue是阻塞队列,但是其并没有阻塞的入队方法,因为该队列是无界的,所以入队是不会阻塞的。
offer()方法正如在结构介绍中提到的通过组合的方式,通过外部加锁内部直接调用PriorityQueue的offer()方法。所以主要的工作在PriorityQueue内部。
入队时通过调用ReentrantLock.lock()进行加锁,然后调用PriorityQueue.offer()方法进行入队操作,最后通过Condition.signal()唤醒等待其上的线程。PriorityQueue.offer()方法将元素插入到队列的最后,然后自上而下调整堆。
poll()和poll(long, TimeUnit),take()
出队操作。poll(long, TimeUnit)是poll()的阻塞版本,同时take()是无限阻塞版poll()(即无期限阻塞,直到获取到数据),通过Condition.awaitNanos()实现阻塞。三者实现主要逻辑相同,只是在等待时不同,这里主要介绍poll(long, TimeUnit)。
具体的出队操作依然是调用PriorityQueue.poll()。
基本操作原理见代码注释,同时可参考博客:数据结构系列——堆进行详细的了解。
remove(Object)
删除其实并不是常用的方法,主要是堆在删除时还是有点值得介绍的。这里我们直接看PriorityQueue.remove()方法。
当删除堆中的一个元素时,将堆的最后一个元素移动到被删除的位置,然后将最后一个位置值为NULL,当把最后一个元素移动到堆中的某个位置时,这时首先需要从该位置开始自上而下的调整堆,如果该位置的元素在调整时发生变化,即堆有变化,则说明该元素是大于其子节点的,那么该节点就不可能小于其上的父节点(因为堆的结构是传递性的,即子节点小于父节点,其孙子节点同时小于其父节点),所以就不需要再网上调整了;但是如果未发生变化,则说明该位置的节点小于其子节点,那么就无法保证其一定比父节点大,所以需要从该节点开始自上而下的调整堆。调整堆的方法是入队和出队时都有介绍,这里就不介绍了。
以上即PriorityBlockingQueue常用的一些方法,另外一些peek(),迭代等方法就不介绍了,毕竟不涉及堆的改变。
使用场景
PriorityBlockingQueue与普通阻塞队列的不同之处就是在于其支持对队列中的元素进行比较,而决定出队的顺序,所以可以使用PriorityBlockingQueue实现高优先级的线程优先执行。