几种类型的BlockingQueue
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。
LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
PriorityBlockingQueue是一个支持优先级的无界队列。默认情况下元素采取自然顺序排列,也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列。
DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将DelayQueue运用在以下应用场景:
- 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
- 定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。
关于BlockingQueue的drainTo方法
int drainTo(Collection<? super E> c, int maxElements)
- 从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
- 在向 collection c 中添加元素没有成功时,可能导致在抛出相关异常时,元素会同时在两个 collection 中出现,或者在其中一个 collection 中出现,也可能在两个 collection 中都不出现。
- 如果试图将一个队列放入自身队列中,则会导致 IllegalArgumentException 异常。
- 如果正在进行此操作时,正在修改指定的 collection,则此操作行为是不确定的。
使用技巧
//先从queue中获取一个对象,如果没有对象线程自动阻塞
FixERForFixOutgoingValue firstValue = stkRequestQueue.take();
list.add(firstValue);
//如果获取到对象了,则一次性获取剩下的全部对象
stkRequestQueue.drainTo(list);
BlockingQueue 构建生产消费者模式:
package lands.concurrent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueTest {
/**
* @param args
*/
public static void main(String[] args) {
//BlockingQueue q = new ArrayBlockingQueue(100); //在构造时需要指定容量
BlockingQueue q = new LinkedBlockingQueue(); //在构造时默认没有上限,但也可以选择指定最大上限
BlockingQueueProducer p1 = new BlockingQueueProducer(q);
BlockingQueueProducer p2 = new BlockingQueueProducer(q);
new Thread(p1).start();
new Thread(p2).start();
BlockingQueueConsumer c1 = new BlockingQueueConsumer(q);
BlockingQueueConsumer c2 = new BlockingQueueConsumer(q);
new Thread(c1).start();
new Thread(c2).start();
}
}
class BlockingQueueProducer implements Runnable {
private final BlockingQueue queue;
BlockingQueueProducer(BlockingQueue q) {
queue = q;
}
public void run() {
try {
int i = 0;
while (true) {
i++;
queue.put(produce(i));
//System.out.println("remainingCapacity:" + queue.remainingCapacity());
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
String produce(int i) {
//create your wanted object
return i + "";
}
}
class BlockingQueueConsumer implements Runnable {
private final BlockingQueue queue;
BlockingQueueConsumer(BlockingQueue q) {
queue = q;
}
public void run() {
try {
while (true) {
consume(queue.take());
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
void consume(Object x) {
//use a object in queue
System.out.println(x);
}
}