DelayQueue主要用于放置实现了Delay接口的对象,其中的对象只能在其时刻到期时才能从队列中取走。这种队列是有序的,即队头的延迟到期时间最短。如果没有任何延迟到期,那就不会有任何头元素,并且poll()将返回null(正因为这样,你不能将null放置到这种队列中)
下面是一个示例,其中的Delayed对象自身就是任务,而DelayedTaskConsumer将最“紧急”的任务从队列中取出来,然后运行它:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*;
class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
protected static List<DelayedTask> sequence = new ArrayList<>();
private final int id = counter++;
private final int delayTime;
private final long triggerTime;
public DelayedTask(int delayInMillis) {
delayTime = delayInMillis;
triggerTime = System.nanoTime() + NANOSECONDS.convert(delayTime, MILLISECONDS);
sequence.add(this);
}
@Override
public int compareTo(Delayed o) {
DelayedTask that = (DelayedTask)o;
if (triggerTime < that.triggerTime) return -1;
if (triggerTime > that.triggerTime) return 1;
return 0;
}
/**
* 剩余的延迟时间
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(triggerTime - System.nanoTime(), NANOSECONDS);
}
@Override
public void run() {
System.out.println(this + " ");
}
@Override
public String toString() {
return String.format("[%1$-4d]", delayTime) + " Task " + id;
}
public static class EndSentinel extends DelayedTask {
private ExecutorService exec;
public EndSentinel(int delay, ExecutorService exec) {
super(delay);
this.exec = exec;
}
@Override
public void run() {
System.out.println(this + " calling shutDownNow()");
exec.shutdownNow();
}
}
}
class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayedTask> tasks;
public DelayedTaskConsumer(DelayQueue<DelayedTask> tasks) {
this.tasks = tasks;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
tasks.take().run();//run tasks with current thread.
}
} catch (InterruptedException e) {
// TODO: handle exception
}
System.out.println("Finished DelaytedTaskConsumer.");
}
}
public class DelayQueueDemo {
public static void main(String[] args) {
int maxDelayTime = 5000;//milliseconds
Random random = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue = new DelayQueue<>();
//填充10个休眠时间随机的任务
for (int i = 0; i < 10; i++) {
queue.put(new DelayedTask(random.nextInt(maxDelayTime)));
}
//结束的哨兵任务,延迟时间最长。这样当执行到这个任务的时候,就是最后一个任务了,可以关闭exec
queue.add(new DelayedTask.EndSentinel(maxDelayTime, exec));
exec.execute(new DelayedTaskConsumer(queue));
}
}
执行结果:
[200 ] Task 7
[429 ] Task 5
[555 ] Task 1
[961 ] Task 4
[1207] Task 9
[1693] Task 2
[1861] Task 3
[4258] Task 0
[4522] Task 8
[4868] Task 6
[5000] Task 10 calling shutDownNow()
Finished DelaytedTaskConsumer.
DelayedTask包含一个称为sequence的List
Delayed接口有一个方法名为getDelay(),它可以用来告知延迟到期还有多长时间,或者延迟在多长时间之前已经到期。这个方法将强制我们去使用TimeUnit类,因为这就是参数类型。这会产生一个非常方便的类,因为你可以很容易地转换单位而无需做任何声明。例如,delayTime的值是以毫秒为单位的,但是System.nanoTime()产生的时间则是以纳秒为单位的。你可以转换delayTime的值,方法是声明它的单位以及你希望以什么单位来表示,就像下面这样:
NANOSECONDS.convert(delayTime, MILLISECONDS);
为了排序,Delayed接口还继承了Comparable接口,因此必须实现compareTo()方法,使其可以产生合理的比较。toString()则提供了输出格式化,而嵌套的EndSentinel类提供了一种关闭所有事物的途径,具体做法是将其放置为队列的最后一个元素。
注意,因为DelayedTaskConsumer自身是一个任务,所以它有自己的Thread,它可以使用这个线程来运行从队列中获取的所有任务。由于任务是按照队列优先级的顺序来执行的,因此在本例中不需要启动任何单独的线程来运行DelayedTask。