前言
前面介绍了ForkJoinPool相关的两个类ForkJoinTask、ForkJoinWorkerThread,现在开始了解ForkJoinPool。ForkJoinPool也是实现了ExecutorService的线程池。但ForkJoinPool不同于其他类型的ExecutorService,主要是因为它使用了窃取工作机制:池中的所有线程都试图查找和执行提交给池和/或由其他活动任务创建的任务(如果不存在工作,则最终阻塞等待工作)。但ForkJoinPool并不是为了代替其他两个线程池,大家所适用的场景各不相同。ForkJoinPool主要是为了执行ForkJoinTask而存在,而ForkJoinTask在上一文已经讲过是一种可以将任务进行递归分解执行从而提高执行并行度的任务,那么ForkJoinPool线程池当然主要就是为了完成这些可递归分解任务的调度执行,加上一些对线程池生命周期的控制,以及提供一些对池的状态检查方法(例如getStealCount),用于帮助开发、调优和监视fork/join应用程序。同样,方法toString以方便的形式返回池状态的指示,以便进行非正式监视。
由于ForkJoinPool的源码太长并且其中涉及到的设计实现非常复杂,目前理解有限,只能做大概的原理阐述以及一些使用示例。
ForkJoinPool原理
在原理开始之前,先了解一下其构造方法,其参数最多的一个构造方法如下:
1 public ForkJoinPool(int parallelism,
2 ForkJoinWorkerThreadFactory factory,
3 UncaughtExceptionHandler handler,
4 boolean asyncMode) {
5 //...........
6 }
构造ForkJoinPool可以指定如下四个参数:
parallelism: 并行度,默认为CPU核心数,最小为1。为1的时候相当于单线程执行。
factory:工作线程工厂,用于创建ForkJoinWorkerThread。
handler:处理工作线程运行任务时的异常情况类,默认为null。
asyncMode:是否为异步模式,默认为 false。这里的同步/异步并不是指F/J框架本身是采用同步模式还是采用异步模式工作,而是指其中的工作线程的工作方式。在F/J框架中,每个工作线程(Worker)都有一个属于自己的任务队列(WorkQueue),这是一个底层采用数组实现的双向队列。同步是指:对于工作线程(Worker)自身队列中的任务,采用后进先出(LIFO)的方式执行;异步是指:对于工作线程(Worker)自身队列中的任务,采用先进先出(FIFO)的方式执行。为true的异步模式只在不join任务结果的消息传递框架中非常有用,因此一般如果任务的结果需要通过join合并,则该参数都设为false。
创建 ForkJoinPool实例除了使用构造方法外,从JDK8开始,还提供了一个静态的commonPool(),该方法可以通过指定系统参数的方式(System.setProperty(?,?))定义“并行度、线程工厂和异常处理类”;并且它使用的是同步模式,也就是说可以支持任务合并(join)。使用该方法返回的ForkJoinPool实例一般来说都能满足大多数的使用场景。
内部数据结构
ForkJoinPool采用了哈希数组 + 双端队列的方式存放任务,但这里的任务分为两类,一类是通过execute、submit 提交的外部任务,另一类是ForkJoinWorkerThread工作线程通过fork/join分解出来的工作任务,ForkJoinPool并没有把这两种任务混在一个任务队列中,对于外部任务,会利用Thread内部的随机probe值映射到哈希数组的偶数槽位中的提交队列中,这种提交队列是一种数组实现的双端队列称之为Submission Queue,专门存放外部提交的任务。对于ForkJoinWorkerThread工作线程,每一个工作线程都分配了一个工作队列,这也是一个双端队列,称之为Work Queue,这种队列都会被映射到哈希数组的奇数槽位,每一个工作线程fork/join分解的任务都会被添加到自己拥有的那个工作队列中。
在ForkJoinPool中的属性 WorkQueue[] workQueues 就是我们所说的哈希数组,其元素就是内部类WorkQueue实现的基于数组的双端队列。该哈希数组的长度为2的幂,并且支持扩容。如下就是该哈希数组的示意结构图:
如图,提交队列位于哈希数组workQueue的奇数索引槽位,工作线程的工作队列位于偶数槽位,默认情况下,asyncMode为false,因此工作线程把工作队列当着栈一样使用,将分解的子任务推入工作队列的top端,取任务的时候也从top端取(前面关于双端队列的介绍中,凡是双端队列都会有两个分别指向队列两端的指针,这里就是图上画出的base和top),而当某些工作线程的任务为空的时候,就会从其他队列(不限于workQueue,也会是提交队列)窃取(steal)任务,如图示拥有workQueue2的工作线程从workQueue1中窃取了一个任务,窃取任务的时候采用的是先进先出FIFO的策略(即从base端窃取任务),这样不但可以避免在取任务的时候与拥有其队列的工作线程发生冲突,从而减小竞争,还可以辅助其完成比较大的任务。而当asyncMode为true的话,拥有该工作队列的工作线程将按照先进先出的策略从base端取任务,这一般只用于不需要返回结果的任务,或者事件消息传递框架。
上图展示了提交任务与分解的任务在ForkJoinPool内部的组织形式,并且简单的阐述了工作窃取机制的原理,其实,工作窃取机制的实现过程还包含很多细节,并没有怎么简单,例如还有一种“互助机制”,假设工作线程2窃取了工作线程1的任务之后,通过fork/join又分解产生了子任务,这些子任务会进入工作线程2的工作队列中,这时候如果工作线程1把剩余的任务都完成了,当他发现自己的任务被别人窃取的话,那么它会试着去窃取工作线程2的任务,(你偷了我的,现在我就要偷你的),这就是互助机制。
关于ForkJoinPool的源码太复杂就不分析了,可以参考如下几篇文章:https://www.jianshu.com/apps?utm_medium=desktop&utm_source=navbar-apps 和https://www.cnblogs.com/zhuxudong/p/10122688.html
一些public方法
除了同ThreadPoolExecutor线程池一样重写了AbstractExecutorService的一些方法例如,submit(异步提交任务返回Future)、execute(异步提交任务无返回值)、invokeAll(批量执行该类没有重写)、invokeAny(只要有一个执行结束就返回,该类没有重写)等方法之外,ForkJoinPool增加了一些自己特有的用于监视其状态的方法可供使用者调用,但一般来说这些方法都用不上,除了那些提交任务的方法:
awaitQuiescence,等待线程池空闲
isQuiescent,如果线程池处于空闲状态,返回true。
getActiveThreadCount,获取正在执行任务或窃取任务的线程个数。
getQueuedSubmissionCount(),获取提交提交给线程池但还没开始执行的任务个数。就是所有提交队列中任务之和。
hasQueuedSubmissions(),若getQueuedSubmissionCount不为0,返回true,表示存在任务还在提交队列没被执行。
getQueuedTaskCount(),返回所有工作线程工作队列中的所有任务个数。
getRunningThreadCount(),返回正在运行的没有被阻塞(例如调用Join会被阻塞)的线程个数。
getStealCount(),返回线程从另一个线程的工作队列中窃取的任务总数的估计值。
getParallelism(),返回线程池的并行度,构造方法中有这个参数。
getAsyncMode(),返回工作线程从其任务队列中取走任务的模式,默认为false,表示LIFO后进先出,否则是FIFO,构造方法中有这个参数。
使用示例
对于Fork/Join的使用,也是围绕ForkJoinTask的三个抽象子类的不同作用进行的,我们知道ForkJoinTask的三个子类是RecursiveAction、RecursiveTask和CountedCompleter,分别用于不返回结果,返回结果以及完成触发指定操作的操作。
对于RecursiveAction的使用,最容易让人想到的例子就是,最一个集合或者数组中的所有元素进行自增运行的操作:
1 public class IncrementTask extends RecursiveAction {
2
3 static final int THRESHOLD = 10;
4 final long[] array;
5 final int lo, hi;
6
7 IncrementTask(long[] array, int lo, int hi) { // 构造方法,指定数组下标范围
8 this.array = array;
9 this.lo = lo;
10 this.hi = hi;
11 }
12
13 // 实现抽象类的接口,这个任务所执行的主要计算。
14 protected void compute() {
15 if (hi - lo < THRESHOLD) { // 数组索引区间小于阈值(任务足够小)
16 for (int i = lo; i < hi; ++i)
17 array[i]++; // 对每个元素自增1
18 } else {
19 int mid = (lo + hi) >>> 1; // 拆分一半
20 invokeAll(new IncrementTask(array, lo, mid), new IncrementTask(array, mid, hi)); // 一起执行
21 }
22 }
23
24 public static void main(String[] args) {
25 long[] array = ... ;//一个很长的数组
26 new IncrementTask(array, 0, array.length).invoke(); //隐式的使用了ForkJoinPool.commonPool()
27 }
28 }
View Code
对于RecursiveAction这种不返回结果的并行运算其实还有很多应用场景,比如Java Doc中使用其来对一个数组的元素进行排序,有比如要将一批数据持久化到数据库等。
1 static class SortTask extends RecursiveAction {
2 final long[] array; final int lo, hi;
3
4 SortTask(long[] array, int lo, int hi) { //构造方法,指定数组下标范围
5 this.array = array; this.lo = lo; this.hi = hi;
6 }
7 SortTask(long[] array) { this(array, 0, array.length); } //构造方法,默认全部数组
8
9 //实现抽象类的接口,这个任务所执行的主要计算。
10 protected void compute() {
11 if (hi - lo < THRESHOLD) //数组索引区间小于阈值(任务足够小)
12 sortSequentially(lo, hi); //排序
13 else {
14 int mid = (lo + hi) >>> 1; //拆分一半
15 invokeAll(new SortTask(array, lo, mid), new SortTask(array, mid, hi)); //一起执行
16 merge(lo, mid, hi); //合并结果
17 }
18 }
19 // implementation details follow:
20 static final int THRESHOLD = 1000; //数组索引最小区间阈值
21
22 void sortSequentially(int lo, int hi) {
23 Arrays.sort(array, lo, hi);
24 }
25 void merge(int lo, int mid, int hi) { //合并结果
26 long[] buf = Arrays.copyOfRange(array, lo, mid);
27 for (int i = 0, j = lo, k = mid; i < buf.length; j++)
28 array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++];
29 }
30 }
Java Doc提供的排序示例
对于RecursiveTask的使用,也是最常用的,例如求和运算1+2+3+4+…+1000000000。
1 public class SumTask extends RecursiveTask<Long>{
2
3 private static final int THRESHOLD = 100000;
4 private int start;//开始下标
5 private int end;//结束下标
6
7
8 public SumTask(int start, int end) {
9 this.start = start;
10 this.end = end;
11 }
12
13 private long computeByUnit() {
14 long sum = 0L;
15 for (int i = start; i < end; i++) {
16 sum += i;
17 }
18 return sum;
19 }
20
21 @Override
22 protected Long compute() {
23 //如果当前任务的计算量在阈值范围内,则直接进行计算
24 if (end - start < THRESHOLD) {
25 return computeByUnit();
26 } else {//如果当前任务的计算量超出阈值范围,则进行计算任务拆分
27 //计算中间索引
28 int middle = (start + end) / 2;
29 //定义子任务-迭代思想
30 SumTask left = new SumTask(start, middle);
31 SumTask right = new SumTask(middle, end);
32 //划分子任务-fork
33 left.fork();
34 //right.fork(); 尾部递归消除
35 //合并计算结果-join
36 return left.join() + right.invoke();
37 }
38 }
39
40 public static void main(String[] args) throws Exception{
41 //这两种方式提交都可以
42 long result = new SumTask(1, 1000000001).invoke(); //不响应中断,必须等待执行完成
43
44 Future<Long> future = ForkJoinPool.commonPool().submit(new SumTask(1, 1000000001));
45 result = future.get(); //可以被中断
46 }
47
48 }
View Code
提交任务到ForkJoinPool可以直接使用ForkJoinTask的invoke,隐式的使用ForkJoinPool.commonPool()池,也可以显示的创建ForkJoinPool实例通过submit提交,通过Future.get的方法获取结果,区别在于后者支持中断,前者必须等待任务完成才会返回。
对于CountedCompleter的使用,由于该类设计成可以构建任务树,所以使用起来灵活多变,CountedCompleter类本身只是提供了一种并发执行任务的设计理念,想要使用它必须要对其有很深的理解才行。这里列举一下Java Doc给出的示例吧:
**示例一**及其三个改进版本,该示例只是一种简单的递归分解任务,但比起传统的fork/join来说,其树形任务结构依然更加可取,因为它们减少了线程间的通信并提高了负载平衡:
1 //版本一
2 class MyOperation { void apply(E e) { ... } } //具体的任务执行逻辑
3
4 class ForEach extends CountedCompleter {
5
6 public static void forEach(E[] array, MyOperation op) {
7 new ForEach(null, array, op, 0, array.length).invoke(); //隐式调用ForkJoinPool.commonPool()执行该任务
8 }
9
10 final E[] array;
11 final MyOperation op;
12 final int lo, hi;
13
14 ForEach(CountedCompleter p, E[] array, MyOperation op, int lo, int hi) {
15 super(p);
16 this.array = array;
17 this.op = op;
18 this.lo = lo;
19 this.hi = hi;
20 }
21
22 public void compute() { // version 1
23 if (hi - lo >= 2) { //分解任务
24 int mid = (lo + hi) >>> 1;
25 setPendingCount(2); // 必须在fork之前设置挂起计数器
26 new ForEach(this, array, op, mid, hi).fork(); // fork右子节点
27 new ForEach(this, array, op, lo, mid).fork(); // fork左子节点
28 }
29 else if (hi > lo) //直接执行足够小的任务
30 op.apply(array[lo]);
31 tryComplete(); //尝试完成该任务,要么递减挂起任务数,已经到0的话,就将父节点的挂起数减1,以此类推
32 }
33 }
34
35 //版本2,尾部递归消除
36 class ForEach<E> ...
37 public void compute() { // version 2
38 if (hi - lo >= 2) {
39 int mid = (lo + hi) >>> 1;
40 setPendingCount(1); // 挂起计数器为1
41 new ForEach(this, array, op, mid, hi).fork(); // fork右子节点
42 new ForEach(this, array, op, lo, mid).compute(); // 直接执行左子节点,尾部递归消除
43 }
44 else {
45 if (hi > lo)
46 op.apply(array[lo]);
47 tryComplete();
48 }
49 }
50 }
51
52 //版本3
53 public void compute() { // version 3
54 int l = lo, h = hi;
55 while (h - l >= 2) { //迭代循环fork每一个任务,而不用创建左子节点
56 int mid = (l + h) >>> 1;
57 addToPendingCount(1);//每一次设置挂起任务数为1
58 new ForEach(this, array, op, mid, h).fork(); // right child
59 h = mid;
60 }
61 if (h > l)
62 op.apply(array[l]);
63 propagateCompletion(); //传播完成
64 }
65 }
View Code
_示例二_,搜索数组:
1 class Searcher extends CountedCompleter {
2 final E[] array;
3 final AtomicReference result; //记录搜索结果
4 final int lo, hi;
5
6 Searcher(CountedCompleter p, E[] array, AtomicReference result, int lo, int hi) {
7 super(p);
8 this.array = array; this.result = result; this.lo = lo; this.hi = hi;
9 }
10
11 public E getRawResult() { return result.get(); }//返回结果
12
13 public void compute() { // 与ForEach 版本3类似
14 int l = lo, h = hi;
15 while (result.get() == null && h >= l) { //迭代循环创建一个任务
16 if (h - l >= 2) { //分解任务
17 int mid = (l + h) >>> 1;
18 addToPendingCount(1); //设置挂起任务数为1
19 new Searcher(this, array, result, mid, h).fork();
20 h = mid;
21 }
22 else { //足够小,只有一个元素了
23 E x = array[l];
24 //找到我们想要的数据了,设置到result中
25 if (matches(x) && result.compareAndSet(null, x))
26 quietlyCompleteRoot(); // 根任务现在可以获取结果了
27 break;
28 }
29 }
30 tryComplete(); // normally complete whether or not found
31 }
32 boolean matches(E e) { ... } // 如果找到了返回true
33
34 public static E search(E[] array) {
35 return new Searcher(null, array, new AtomicReference(), 0, array.length).invoke();
36 }
37 }
View Code
_示例三_,map-reduce:
1 class MyMapper { E apply(E v) { ... } }
2
3 class MyReducer { E apply(E x, E y) { ... } }
4
5 class MapReducer extends CountedCompleter {
6 final E[] array; final MyMapper mapper;
7 final MyReducer reducer; final int lo, hi;
8 MapReducer sibling;
9 E result;
10 MapReducer(CountedCompleter p, E[] array, MyMapper mapper, MyReducer reducer, int lo, int hi) {
11 super(p);
12 this.array = array; this.mapper = mapper;
13 this.reducer = reducer; this.lo = lo; this.hi = hi;
14 }
15 public void compute() {
16 if (hi - lo >= 2) { //分解任务
17 int mid = (lo + hi) >>> 1;
18 MapReducer left = new MapReducer(this, array, mapper, reducer, lo, mid);
19 MapReducer right = new MapReducer(this, array, mapper, reducer, mid, hi);
20 left.sibling = right;
21 right.sibling = left;
22 setPendingCount(1); // 只有右任务的挂起的
23 right.fork(); //fork又任务
24 left.compute(); // 直接执行左子节点
25 }
26 else { //任务足够小
27 if (hi > lo)
28 result = mapper.apply(array[lo]); //直接执行
29 tryComplete();
30 }
31 }
32 public void onCompletion(CountedCompleter caller) {
33 if (caller != this) {
34 MapReducer child = (MapReducer)caller;
35 MapReducer sib = child.sibling; //兄弟任务
36 if (sib == null || sib.result == null)
37 result = child.result; //没有兄弟任务那就是最小那个任务
38 else
39 //合并兄弟任务的结果
40 result = reducer.apply(child.result, sib.result);
41 }
42 }
43 public E getRawResult() { return result; } //返回结果
44
45 public static E mapReduce(E[] array, MyMapper mapper, MyReducer reducer) {
46 return new MapReducer(null, array, mapper, reducer,
47 0, array.length).invoke();
48 }
49 }
View Code
示例四,完全遍历:
1 class MapReducer extends CountedCompleter { // version 2
2 final E[] array;
3 final MyMapper mapper;
4 final MyReducer reducer; final int lo, hi;
5 MapReducer forks, next; // record subtask forks in list
6 E result;
7 MapReducer(CountedCompleter p, E[] array, MyMapper mapper,
8 MyReducer reducer, int lo, int hi, MapReducer next) {
9 super(p);
10 this.array = array; this.mapper = mapper;
11 this.reducer = reducer; this.lo = lo; this.hi = hi;
12 this.next = next;
13 }
14 public void compute() {
15 int l = lo, h = hi;
16 while (h - l >= 2) { //循环创建任务
17 int mid = (l + h) >>> 1;
18 addToPendingCount(1);
19 (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork();
20 h = mid;
21 }
22 if (h > l) //任务足够小,直接执行
23 result = mapper.apply(array[l]);
24
25 // 通过减少和推进子任务链接来完成流程
26 //利用firstComplete,nextComplete两个方法遍历
27 for (CountedCompleter c = firstComplete(); c != null; c = c.nextComplete()) {
28 for (MapReducer t = (MapReducer)c, s = t.forks; s != null; s = t.forks = s.next)
29 t.result = reducer.apply(t.result, s.result);
30 }
31 }
32 public E getRawResult() { return result; }
33
34 public static E mapReduce(E[] array, MyMapper mapper, MyReducer reducer) {
35 return new MapReducer(null, array, mapper, reducer,
36 0, array.length, null).invoke();
37 }
38 }}
View Code
示例五,触发器:
1 class HeaderBuilder extends CountedCompleter<...> { ... }
2
3 class BodyBuilder extends CountedCompleter<...> { ... }
4
5 class PacketSender extends CountedCompleter<...> {
6 PacketSender(...) { super(null, 1); ... } // trigger on second completion
7 public void compute() { } // never called
8 public void onCompletion(CountedCompleter caller) { sendPacket(); }
9 }
10
11 // sample use:
12 PacketSender p = new PacketSender();
13 new HeaderBuilder(p, ...).fork();
14 new BodyBuilder(p, ...).fork();
View Code
说实话,这几个示例除了前面三个,其他的我基本上也没完全理解,先记录着吧,关于RecursiveAction、RecursiveTask的更多示例,可以查看Java Doc的类注释。其实一般来说,利用那两个简单的RecursiveAction、RecursiveTask已经足够解决我们的问题了,在没有理解CountedCompleter之前,最好不要使用它。
总结
其实要想详细的理解ForkJoinPool,也就是Fork/Join框架的实现过程是一个非常耗时耗精力的事情,我这里只是记录了我自己对ForkJoinPool粗浅的理解,所谓总结也就谈不上有多深入了,但我们学习这些框架结构的时候,最主要的其实也还是学习其设计思想与理念,并不一定非要每一句代码都分析到,对于ForkJoinPool线程池和ForkJoinTask、ForkJoinWorkerThread三者实现的Fork/Join框架,目前按我的理解它就是为了利于现有计算机多核心的架构,将一些可以进行分解的任务进行递归分解之后并行的执行,有些地方也将这种思想称之为分治法,并在执行完成之后利于递归的原理将任务结果汇聚起来,从而达到快速解决复杂任务的高效编程设计。对于ForkJoinPool其内部采用了很多为了加快并行度而实现的设计思想,例如工作窃取机制,互助机制,垃圾回收机制等等都是我们值得学习借鉴的地方。