当你使用线程来同时执行多个任务时,可以通过使用锁(互斥)来同步两个任务的行为,从而使得一个任务不会干涉另一个任务的资源。也就是说,如果两个任务在交替着使用某项共享资源(通常是内存),你可以使用互斥来是的任何时刻只有一个任务可以访问这项资源。那么,如果线程之间是协作关系,我们必须保证某些步骤在其他步骤之前先被处理。举个例子:必须先挖房子的地基,接下来才能并行的铺设钢结构和构建水泥部件,而这两项任务又必须在浇注混凝土之前完成,等等。
当任务协作时,关键问题是这些任务之间的握手。为了实现握手,我们使用了相同的基础特性:互斥。在这种情况下,互斥能够确保只有一个任务可以响应某个信号,这样就可以根除任何可能的竞争条件。在互斥之上,我们为任务添加了一种途径,可以将其自身挂起,直到某些外部条件发生变化(例如:地基已经挖好),表示是时候让这个任务向前进行了为止。本文,我们将浏览任务间的握手问题,这种握手可以通过Object的方法wait()和notify()来安全地实现。JavaSE5的并发类库还提供了具有await()和signal()方法的Condition对象。
wait()与notifyAll()
wait()使你可以等待某个条件发生变化,而改变这个条件超出了当前方法的控制能力。通常,这种条件将由另一个任务来改变。你肯定不想在你的任务测试这个条件的同时,不断地进行空循环,这被称为忙等待,通常是一种不良的CPU周期使用方式。因此wait()会在等待外部世界产生变化的时候将任务挂起,并且只有在notify()或notifyAll()发生时,即表示发生了某些感兴趣的事物,这个任务才被唤醒并去检查所发生的变化。因此wait()提供了一种在任务之间对活动同步的方式。
调用sleep()的时候锁并没有被释放,调用yield()也属于这种情况,理解这一点很重要。另一方面,当一个任务在方法里遇到了对wait()的调用的时候,线程的执行被挂起,对象上的锁被释放。因为wait()将释放锁,这就意味着另一个任务可以获得这个锁,因此在该对象(现在是未锁定的)中的其他synchronized方法可以在wait()期间被调用。因为这些其他的方法通常将会产生改变,而这种改变正是使被挂起的任务重新唤醒所感兴趣的变化。因此,当你调用wait()时,就是在声明:“我已经刚刚做完所有能做的事情,因此我要在这里等待,但是我希望其他的synchronized操作在条件适合的情况下能够执行。”
有两种形式的wait(),分别是sleep(long millis)和sleep(),第一种方式接受好描述作为参数,含义与sleep()方法里参数的意思相同,都是指“在此期间暂停”。但与sleep()不同的是,对于wait()而言:
在wait()期间,对象锁是释放的
可以通过notify()、notifyAll(),或者令时间到期,从wait()中恢复执行
第二种,也是更常用形式,它不接受任何参数,这种wait()将无线等待下去,直到线程接受到notify()或者notifyAll()消息。
wait()、notify()、notifyAll()有一个比较特殊的方面,那就是这些方法的基类是Object的一部分,而不是Thread类的一部分。尽管开始看起来有点奇怪——仅仅针对线程的功能却作为通用基类的一部分而实现,不过这是有道理的,因为这些方法操作的锁也是所有对象的一部分。实际上,**只能在同步控制方法或者同步控制块里调用wait()、notify()、和notifyAll()**(因为不用操作锁,所以sleep()可以在非同步控制方法里调用)。
如果在非同步控制方法里调用这些方法,程序能通过编译,但在运行的时候,将得到IllegalMonitorStateException异常,并伴随着一些模糊的消息,比如:当前线程不是锁的拥有者。消息的意思是,调用wait()、notify()和notifyAll()的任务在调用这些方法之前必须“拥有”(获取)对象的锁。
可以让另一个对象执行某种操作以维护其自己的锁。要这么做的话,必须首先得到对象的锁。比如,如果要向对象x发送notifyAll(),那么就必须在能够得到x的锁的同步块中这么做:
synchronized(x) {
x.notifyAll();
}
让我们来看一个简单的示例,WaxOMatic.java有两个过程:一个是将蜡涂到Car上,一个是抛光它。抛光任务在涂蜡任务之后完成,而涂蜡任务在涂另一层蜡之前,必须等待抛光任务完成。WaxOn和WaxOff都使用了Car对象,该对象在这些任务等待条件变化的时候,使用wait()和notifyAll()来挂起和重新启动这些任务:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class Car {
private boolean waxOn = false;//是否上蜡
//上蜡
public synchronized void waxed() {
waxOn = true;
notify();
}
//抛光
public synchronized void buffed() {
waxOn = false;
notify();
}
//等待上蜡
public synchronized void waitForWaxing() throws InterruptedException {
while(waxOn == false) {
this.wait();
}
}
//等待抛光
public synchronized void waitForBuffing() throws InterruptedException {
while(waxOn == true) {
this.wait();
}
}
}
class WaxOnTask implements Runnable {
private Car car;
private String name;
public WaxOnTask(String name, Car car) {
this.name = name;
this.car = car;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
System.out.println("[" + name + "] is Wax on!");//正在上蜡
TimeUnit.MILLISECONDS.sleep(500);
car.waxed();//上蜡完成
car.waitForBuffing();//等待抛光
}
} catch (InterruptedException e) {
System.out.println("[" + name + "] Exiting WaxOnTask via interrupt.");
}
}
}
class BuffTask implements Runnable {
private Car car;
private String name;
public BuffTask(String name, Car car) {
this.name = name;
this.car = car;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
car.waitForWaxing();//等待上蜡
System.out.println("[" + name + "] Buffing...");//正在抛光
TimeUnit.MILLISECONDS.sleep(500);
car.buffed();//抛光完成
}
} catch (InterruptedException e) {
System.out.println("[" + name + "] Exiting BuffTask via interrupt.");
}
}
}
public class WaxOMatic {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
//上蜡
exec.execute(new WaxOnTask("Wax", car));
//抛光
exec.execute(new BuffTask("Buff", car));
//运行一段时间,停止ExecutorService
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
执行结果:
[Wax] is Wax on!
[Buff] Buffing...
[Wax] is Wax on!
[Buff] Buffing...
[Wax] is Wax on!
[Buff] Buffing...
[Wax] is Wax on!
[Buff] Buffing...
[Wax] is Wax on!
[Buff] Buffing...
[Wax] is Wax on!
[Buff] Exiting BuffTask via interrupt.
[Wax] Exiting WaxOnTask via interrupt.
这里,Car有一个单一的boolean属性waxOn,表示涂蜡-抛光的处理状态。
在waitForWaxing()中将检查waxOn标志,如果它为false,那么这个调用任务将通过调用wait()方法而挂起。这个行为发生在synchronized方法中这一点很重要,因为在这样的方法中,任务已经获得了锁。当你调用wait()时,线程被挂起,而锁被释放。所被释放是这一点的本质所在,因为为了安全地改变对象的状态,其他某个任务就必须能够获得这个锁。
WaxOnTask.run()表示给汽车打蜡过程的第一个步骤,因此它将执行它的操作:调用sleep()以模拟需要打蜡的时间,然后告知汽车打蜡结束,并调用waitForBuffing(),这个方法会用一个wait()来挂起这个任务,直至BuffTask任务调用这辆车的buffed(),从而改变状态并调用notifyAll()为止。另一方面,BuffTask.run()立即进入waitForWaxing(),并因此而被挂起,直至WaxOnTask涂完蜡并且waxed()被调用。整个运行过程中,你可以看到当控制权在两个任务之间来回交互传递时,这两个步骤过程在不断的重复。5秒钟之后,shutdownNow()方法发送给每个线程的interrupt信号会终止每个线程。