目前为止,该教程重点讲述了最初作为Java平台一部分的低级别API。这些API对于非常基本的任务来说已经足够,但是对于更高级的任务就需要更高级的API。特别是针对充分利用了当今多处理器和多核系统的大规模并发应用程序。 本节,我们将着眼于Java 5.0新增的一些高级并发特征。大多数特征已经在新的java.util.concurrent包中实现。Java集合框架中也定义了新的并发数据结构。
- 锁对象提供了可以简化许多并发应用的锁的惯用法。
- Executors为加载和管理线程定义了高级API。Executors的实现由java.util.concurrent包提供,提供了适合大规模应用的线程池管理。
- 并发集合简化了大型数据集合管理,且极大的减少了同步的需求。
- 原子变量有减小同步粒度和避免内存一致性错误的特征。
- 并发随机数(JDK7)提供了高效的多线程生成伪随机数的方法。
1. 锁对象
同步代码依赖于一种简单的可重入锁。这种锁使用简单,但也有诸多限制。
java.util.concurrent.locks 包提供了更复杂的锁。我们不会详细考察这个包,但会重点关注其最基本的接口,锁。 锁对象作用非常类似同步代码使用的隐式锁。如同隐式锁,每次只有一个线程可以获得锁对象。通过关联 Condition 对象,锁对象也支持wait/notify机制。 锁对象之于隐式锁最大的优势在于,它们有能力收回获得锁的尝试。如果当前锁对象不可用,或者锁请求超时(如果超时时间已指定),tryLock方法会收回获取锁的请求。如果在锁获取前,另一个线程发送了一个中断,lockInterruptibly方法也会收回获取锁的请求。 让我们使用锁对象来解决我们在 活跃度 中见到的死锁问题。Alphonse和Gaston已经把自己训练成能注意到朋友何时要鞠躬。我们通过要求Friend对象在双方鞠躬前必须先获得锁来模拟这次改善。下面是改善后模型的源代码,Safelock。为了展示其用途广泛,我们假设Alphonse和Gaston对于他们新发现的稳定鞠躬的能力是如此入迷,以至于他们无法不相互鞠躬。
Java代码
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Random;
public class Safelock {
static class Friend {
private final String name;
private final Lock lock = new ReentrantLock();
public Friend(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
public boolean impendingBow(Friend bower) {
Boolean myLock = false;
Boolean yourLock = false;
try {
myLock = lock.tryLock();
yourLock = bower.lock.tryLock();
} finally {
if (! (myLock && yourLock)) {
if (myLock) {
lock.unlock();
}
if (yourLock) {
bower.lock.unlock();
}
}
}
return myLock && yourLock;
}
public void bow(Friend bower) {
if (impendingBow(bower)) {
try {
System.out.format("%s: %s has"
+ " bowed to me!%n",
this.name, bower.getName());
bower.bowBack(this);
} finally {
lock.unlock();
bower.lock.unlock();
}
} else {
System.out.format("%s: %s started"
+ " to bow to me, but saw that"
+ " I was already bowing to"
+ " him.%n",
this.name, bower.getName());
}
}
public void bowBack(Friend bower) {
System.out.format("%s: %s has" +
" bowed back to me!%n",
this.name, bower.getName());
}
}
static class BowLoop implements Runnable {
private Friend bower;
private Friend bowee;
public BowLoop(Friend bower, Friend bowee) {
this.bower = bower;
this.bowee = bowee;
}
public void run() {
Random random = new Random();
for (;;) {
try {
Thread.sleep(random.nextInt(10));
} catch (InterruptedException e) {}
bowee.bow(bower);
}
}
}
public static void main(String[] args) {
final Friend alphonse =
new Friend("Alphonse");
final Friend gaston =
new Friend("Gaston");
new Thread(new BowLoop(alphonse, gaston)).start();
new Thread(new BowLoop(gaston, alphonse)).start();
}
}
2. 执行器(Executors)
在之前所有的例子中,Thread对象表示的线程和Runnable对象表示的线程所执行的任务之间是紧耦合的。这对于小型应用程序来说没问题,但对于大规模并发应用来说,合理的做法是将线程的创建与管理和程序的其他部分分离开。封装这些功能的对象就是执行器,接下来的部分将讲详细描述执行器。
3. Executor接口
java.util.concurrent中包括三个Executor接口:
- Executor,一个运行新任务的简单接口。
- ExecutorService,扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法。
- ScheduledExecutorService,扩展了ExecutorService。支持Future和定期执行任务。
通常来说,指向Executor对象的变量应被声明为以上三种接口之一,而不是具体的实现类。
Executor接口
Executor 接口只有一个execute方法,用来替代通常创建(启动)线程的方法。例如:r是一个Runnable对象,e是一个Executor对象。可以使用
Java代码
- e.execute(r);
来代替
Java代码
- (new Thread(r)).start();
但execute方法没有定义具体的实现方式。对于不同的Executor实现,execute方法可能是创建一个新线程并立即启动,但更有可能是使用已有的工作线程运行r,或者将r放入到队列中等待可用的工作线程。(我们将在线程池一节中描述工作线程。)
ExecutorService接口
ExecutorService 接口在提供了execute方法的同时,新加了更加通用的submit方法。submit方法除了和execute方法一样可以接受Runnable对象作为参数,还可以接受Callable对象作为参数。使用Callable对象可以能使任务返还执行的结果。通过submit方法返回的Future对象可以读取Callable任务的执行结果,或是管理Callable任务和Runnable任务的状态。 ExecutorService也提供了批量运行Callable任务的方法。最后,ExecutorService还提供了一些关闭执行器的方法。如果需要支持即时关闭,执行器所执行的任务需要正确处理中断。
ScheduledExecutorService接口
ScheduledExecutorService 扩展ExecutorService接口并添加了schedule方法。调用schedule方法可以在指定的延时后执行一个Runnable或者Callable任务。ScheduledExecutorService接口还定义了按照指定时间间隔定期执行任务的scheduleAtFixedRate方法和scheduleWithFixedDelay方法。
4. 线程池
在java.util.concurrent包中多数的执行器实现都使用了由工作线程组成的线程池,工作线程独立于所它所执行的Runnable任务和Callable任务,并且常用来执行多个任务。 使用工作线程可以使创建线程的开销最小化。
在大规模并发应用中,创建大量的Thread对象会占用占用大量系统内存,分配和回收这些对象会产生很大的开销。一种最常见的线程池是固定大小的线程池。这种线程池始终有一定数量的线程在运行,如果一个线程由于某种原因终止运行了,线程池会自动创建一个新的线程来代替它。需要执行的任务通过一个内部队列提交给线程,当没有更多的工作线程可以用来执行任务时,队列保存额外的任务。 使用固定大小的线程池一个很重要的好处是可以实现优雅退化。例如一个Web服务器,每一个HTTP请求都是由一个单独的线程来处理的,如果为每一个HTTP都创建一个新线程,那么当系统的开销超出其能力时,会突然地对所有请求都停止响应。如果限制Web服务器可以创建的线程数量,那么它就不必立即处理所有收到的请求,而是在有能力处理请求时才处理。 创建一个使用线程池的执行器最简单的方法是调用 java.util.concurrent.Executors 的 newFixedThreadPool 方法。Executors类还提供了下列一下方法:
- newCachedThreadPool方法创建了一个可扩展的线程池。适合用来启动很多短任务的应用程序。
- newSingleThreadExecutor方法创建了每次执行一个任务的执行器。
- 还有一些创建ScheduledExecutorService执行器的方法。
如果上面的方法都不满足需要,可以尝试 java.util.concurrent.ThreadPoolExecutor 或者 java.util.concurrent.ScheduledThreadPoolExecutor 。
5. Fork/Joint
fork/join框架是ExecutorService接口的一种具体实现,目的是为了帮助你更好地利用多处理器带来的好处。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。其目的在于能够使用所有可用的运算能力来提升你的应用的性能。 类似于ExecutorService接口的其他实现,fork/join框架会将任务分发给线程池中的工作线程。fork/join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。 fork/join框架的核心是 ForkJoinPool 类,它是对AbstractExecutorService类的扩展。ForkJoinPool实现了工作偷取算法,并可以执行 ForkJoinTask 任务。
基本使用方法
使用fork/join框架的第一步是编写执行一部分工作的代码。你的代码结构看起来应该与下面所示的伪代码类似:
Java代码
- if (当前这个任务工作量足够小)
- 直接完成这个任务
- else
- 将这个任务或这部分工作分解成两个部分
- 分别触发(invoke)这两个子任务的执行,并等待结果
你需要将这段代码包裹在一个ForkJoinTask的子类中。不过,通常情况下会使用一种更为具体的的类型,或者是 RecursiveTask (会返回一个结果),或者是 RecursiveAction 。 当你的ForkJoinTask子类准备好了,创建一个代表所有需要完成工作的对象,然后将其作为参数传递给一个ForkJoinPool实例的invoke()方法即可。
要清晰,先模糊
想要了解fork/join框架的基本工作原理,接下来的这个例子会有所帮助。假设你想要模糊一张图片。原始的source图片由一个整数的数组表示,每个整数表示一个像素点的颜色数值。与source图片相同,模糊之后的destination图片也由一个整数数组表示。 对图片的模糊操作是通过对source数组中的每一个像素点进行处理完成的。处理的过程是这样的:将每个像素点的色值取出,与周围像素的色值(红、黄、蓝三个组成部分)放在一起取平均值,得到的结果被放入destination数组。因为一张图片会由一个很大的数组来表示,这个流程会花费一段较长的时间。如果使用fork/join框架来实现这个模糊算法,你就能够借助多处理器系统的并行处理能力。下面是上述算法结合fork/join框架的一种简单实现:
Java代码
public class ForkBlur extends RecursiveAction {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;
// Processing window size; should be odd.
private int mBlurWidth = 15;
public ForkBlur(int[] src, int start, int length, int[] dst) {
mSource = src;
mStart = start;
mLength = length;
mDestination = dst;
}
protected void computeDirectly() {
int sidePixels = (mBlurWidth - 1) / 2;
for (int index = mStart; index < mStart + mLength; index++) {
// Calculate average.
float rt = 0, gt = 0, bt = 0;
for (int mi = -sidePixels; mi <= sidePixels; mi++) {
int mindex = Math.min(Math.max(mi + index, 0),
mSource.length - 1);
int pixel = mSource[mindex];
rt += (float)((pixel & 0x00ff0000) >> 16)
/ mBlurWidth;
gt += (float)((pixel & 0x0000ff00) >> 8)
/ mBlurWidth;
bt += (float)((pixel & 0x000000ff) >> 0)
/ mBlurWidth;
}
// Reassemble destination pixel.
int dpixel = (0xff000000 ) |
(((int)rt) << 16) |
(((int)gt) << 8) |
(((int)bt) << 0);
mDestination[index] = dpixel;
}
}
接下来你需要实现父类中的compute()方法,它会直接执行模糊处理,或者将当前的工作拆分成两个更小的任务。数组的长度可以作为一个简单的阀值来判断任务是应该直接完成还是应该被拆分。
Java代码
protected static int sThreshold = 100000;
protected void compute() {
if (mLength < sThreshold) {
computeDirectly();
return;
}
int split = mLength / 2;
invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
new ForkBlur(mSource, mStart + split, mLength - split,
mDestination));
}
如果前面这个方法是在一个RecursiveAction的子类中,那么设置任务在ForkJoinPool中执行就再直观不过了。通常会包含以下一些步骤:
(1) 创建一个表示所有需要完成工作的任务。
Java代码
- // source image pixels are in src
- // destination image pixels are in dst
- ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
(2) 创建将要用来执行任务的ForkJoinPool。
Java代码
- ForkJoinPool pool = new ForkJoinPool();
(3) 执行任务。
Java代码
- pool.invoke(fb);
想要浏览完成的源代码,请查看 ForkBlur ,其中还包含一些创建destination图片文件的额外代码。
标准实现
除了能够使用fork/join框架来实现能够在多处理系统中被并行执行的定制化算法(如前文中的ForkBlur.java例子),在Java SE中一些比较常用的功能点也已经使用fork/join框架来实现了。在Java SE 8中,java.util.Arrays类的一系列parallelSort()方法就使用了fork/join来实现。这些方法与sort()系列方法很类似,但是通过使用fork/join框架,借助了并发来完成相关工作。在多处理器系统中,对大数组的并行排序会比串行排序更快。这些方法究竟是如何运用fork/join框架并不在本教程的讨论范围内。想要了解更多的信息,请参见Java API文档。 其他采用了fork/join框架的方法还包括java.util.streams包中的一些方法,此包是作为Java SE 8发行版中 Project Lambda 的一部分。想要了解更多信息,请参见 Lambda Expressions 一节。
6. 并发集合
java.util.concurrent包囊括了Java集合框架的一些附加类。它们也最容易按照集合类所提供的接口来进行分类:
- BlockingQueue定义了一个先进先出的数据结构,当你尝试往满队列中添加元素,或者从空队列中获取元素时,将会阻塞或者超时。
- ConcurrentMap是java.util.Map的子接口,定义了一些有用的原子操作。移除或者替换键值对的操作只有当key存在时才能进行,而新增操作只有当key不存在时。使这些操作原子化,可以避免同步。ConcurrentMap的标准实现是ConcurrentHashMap,它是HashMap的并发模式。
- ConcurrentNavigableMap是ConcurrentMap的子接口,支持近似匹配。ConcurrentNavigableMap的标准实现是ConcurrentSkipListMap,它是TreeMap的并发模式。
- 所有这些集合,通过 在集合里新增对象和访问或移除对象的操作之间,定义一个happens-before的关系,来帮助程序员避免内存一致性错误。
7. 原子变量
java.util.concurrent.atomic 包定义了对单一变量进行原子操作的类。所有的类都提供了get和set方法,可以使用它们像读写volatile变量一样读写原子类。就是说,同一变量上的一个set操作对于任意后续的get操作存在happens-before关系。原子的compareAndSet方法也有内存一致性特点,就像应用到整型原子变量中的简单原子算法。 为了看看这个包如何使用,让我们返回到最初用于演示线程干扰的 Counter 类:
Java代码
class Counter {
private int c = 0;
public void increment() {
c++;
}
public void decrement() {
c--;
}
public int value() {
return c;
}
}
使用同步是一种使Counter类变得线程安全的方法,如 SynchronizedCounter :
Java代码
- class SynchronizedCounter {
- private int c = 0;
- public synchronized void increment() {
- c++;
- }
- public synchronized void decrement() {
- c--;
- }
- public synchronized int value() {
- return c;
- }
- }
对于这个简单的类,同步是一种可接受的解决方案。但是对于更复杂的类,我们可能想要避免不必要同步所带来的活跃度影响。将int替换为AtomicInteger允许我们在不进行同步的情况下阻止线程干扰,如 AtomicCounter :
Java代码
import java.util.concurrent.atomic.AtomicInteger;
class AtomicCounter {
private AtomicInteger c = new AtomicInteger(0);
public void increment() {
c.incrementAndGet();
}
public void decrement() {
c.decrementAndGet();
}
public int value() {
return c.get();
}
8. 并发随机数
在JDK7中,java.util.concurrent包含了一个相当便利的类,ThreadLocalRandom,当应用程序期望在多个线程或ForkJoinTasks中使用随机数时。
对于并发访问,使用TheadLocalRandom代替Math.random()可以减少竞争,从而获得更好的性能。
你只需调用ThreadLocalRandom.current(), 然后调用它的其中一个方法去获取一个随机数即可。下面是一个例子:
Java代码
- int r = ThreadLocalRandom.current().nextInt(4,77);