wait()和notifyAll()方法以一种非常低级的方式解决了任务互操作的问题,即每次交互时都需要握手。在许多情况下,你可以瞄准更高的抽象级别,使用同步队列来解决任务协作的问题。同步队列在任何时刻都只允许一个任务插入或移除元素。在java.util.concurrent.BlockingQueue接口中提供了这个队列,这个接口有大量的标准实现。你通常可以使用LinkedBlockingQueue,它是一个无界队列,你还可以使用ArrayBlockingQueue,它具有固定的尺寸,因此你可以在它被阻塞之前,向其中放置有限数量的元素。
如果消费者任务试图从队列中获取对象,而该队列此时为空,那么这些队列还可以挂起消费者任务,并且当有更多的元素可用时回复消费者任务。阻塞队列可以解决非常大的问题,而其方式与wait()和notifyAll()相比,则要简单并可靠许多。
考虑下面这个BlockingQueue的示例,有一台机器具有三个任务:一个制作吐司,一个给吐司抹黄油,还有一个给吐司涂果酱。我们可以通过各个处理过程之间的BlockingQueue来运行这个吐司制作程序:
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
class Toast {
/**
* 吐司的状态:
* DRY: 烘干的
* BUTTERED: 涂了黄油的
* JAMMED: 涂了果酱的
* <p>吐司的状态只能由DRY->BUTTERED->JAMMED转变
*/
public enum Status {DRY, BUTTERED, JAMMED}
private Status status = Status.DRY;//默认状态为DRY
private final int id;
public Toast(int id) { this.id = id;}
public void butter() {status = Status.BUTTERED;}
public void jam() {status = Status.JAMMED;}
public Status getStatus() {return status;}
public int getId() {return id;}
public String toString() {
return "Toast id: " + id + ", status: " + status;
}
}
@SuppressWarnings("serial")
class ToastQueue extends LinkedBlockingQueue<Toast> {}
/**
* 生产吐司的任务。
*/
class Toaster implements Runnable {
private ToastQueue toastQueue;
private int count = 0;
private Random random = new Random(47);
public Toaster(ToastQueue queue) {
this.toastQueue = queue;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(300 + random.nextInt(500));
//生产一片吐司,这些吐司是有序的
Toast toast = new Toast(count++);
System.out.println(toast);
//放到toastQueue中
toastQueue.put(toast);
}
} catch (InterruptedException e) {
System.out.println("Toaster interrupted.");
}
System.out.println("Toaster off.");
}
}
/**
* 涂黄油的任务。
*/
class Butterer implements Runnable {
private ToastQueue dryQueue;
private ToastQueue butteredQueue;
public Butterer(ToastQueue dryQueue, ToastQueue butteredQueue) {
this.dryQueue = dryQueue;
this.butteredQueue = butteredQueue;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
//在取得下一个吐司之前会一直阻塞
Toast toast = dryQueue.take();
toast.butter();
System.out.println(toast);
butteredQueue.put(toast);
}
} catch (InterruptedException e) {
System.out.println("Butterer interrupted.");
}
System.out.println("Butterer off.");
}
}
/**
* 涂果酱的任务。
*/
class Jammer implements Runnable {
private ToastQueue butteredQueue;
private ToastQueue finishedQueue;
public Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) {
this.finishedQueue = finishedQueue;
this.butteredQueue = butteredQueue;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
//在取得下一个吐司之前会一直阻塞
Toast toast = butteredQueue.take();
toast.jam();
System.out.println(toast);
finishedQueue.put(toast);
}
} catch (InterruptedException e) {
System.out.println("Jammer interrupted.");
}
System.out.println("Jammer off.");
}
}
/**
* 吃吐司的人,消费者。
*/
class Eater implements Runnable {
private ToastQueue finishedQueue;
private int count = 0;
public Eater (ToastQueue finishedQueue) {
this.finishedQueue = finishedQueue;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
//在取得下一个吐司之前会一直阻塞
Toast toast = finishedQueue.take();
//验证取得的吐司是有序的,而且状态是JAMMED的
if (toast.getId() != count++ ||
toast.getStatus() != Toast.Status.JAMMED) {
System.out.println("Error -> " + toast);
System.exit(-1);
} else {
//吃掉吐司
System.out.println(toast + "->Eaten");
}
}
} catch (InterruptedException e) {
System.out.println("Eater interrupted.");
}
System.out.println("Eater off.");
}
}
public class ToastOMatic {
public static void main(String[] args) throws Exception {
ToastQueue dryQueue = new ToastQueue();
ToastQueue butteredQueue = new ToastQueue();
ToastQueue finishedQueue = new ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Toaster(dryQueue));
exec.execute(new Butterer(dryQueue, butteredQueue));
exec.execute(new Jammer(butteredQueue, finishedQueue));
exec.execute(new Eater(finishedQueue));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
执行结果(可能的结果):
Toast id: 0, status: DRY
Toast id: 0, status: BUTTERED
Toast id: 0, status: JAMMED
Toast id: 0, status: JAMMED->Eaten
Toast id: 1, status: DRY
Toast id: 1, status: BUTTERED
Toast id: 1, status: JAMMED
Toast id: 1, status: JAMMED->Eaten
Toast id: 2, status: DRY
Toast id: 2, status: BUTTERED
Toast id: 2, status: JAMMED
Toast id: 2, status: JAMMED->Eaten
Toast id: 3, status: DRY
Toast id: 3, status: BUTTERED
Toast id: 3, status: JAMMED
Toast id: 3, status: JAMMED->Eaten
Toast id: 4, status: DRY
Toast id: 4, status: BUTTERED
Toast id: 4, status: JAMMED
Toast id: 4, status: JAMMED->Eaten
Toast id: 5, status: DRY
Toast id: 5, status: BUTTERED
Toast id: 5, status: JAMMED
Toast id: 5, status: JAMMED->Eaten
Toast id: 6, status: DRY
Toast id: 6, status: BUTTERED
Toast id: 6, status: JAMMED
Toast id: 6, status: JAMMED->Eaten
Toast id: 7, status: DRY
Toast id: 7, status: BUTTERED
Toast id: 7, status: JAMMED
Toast id: 7, status: JAMMED->Eaten
Eater interrupted.
Eater off.
Butterer interrupted.
Toaster interrupted.
Toaster off.
Jammer interrupted.
Jammer off.
Butterer off.
Toast是一个使用enum值的优秀示例。注意,这个示例中没有任何显式的同步(即使用Lock对象或者synchronized关键字的同步),因为同步已经由队列和系统的设计隐式的管理了——每片Toast在任何时刻都只由一个任务在操作。因为队列的阻塞,使得处理过程将被自动的挂起和恢复。你可以看到由BlockingQueue产生的简化十分明显。在使用显式的wait()和notifyAll()时存在的类和类之间的耦合被消除了,因为每个类都只和它的BlockingQueue通信。