ForkJoinPool源码简单解析

Stella981
• 阅读 616

ForkJoin框架之ForkJoinTask

ForkJoinPool源码简单解析  java

阅读约 62 分钟

前言

在前面的文章"CompletableFuture和响应式编程"中提到了ForkJoinTask和ForkJoinPool,后者毫无疑问是一个线程池,前者则是一个类似FutureTask经典定义的概念.

官方有一个非常无语的解释:ForkJoinTask就是运行在ForkJoinPool的一个任务抽象,ForkJoinPool就是运行ForkJoinTask的线程池.

ForkJoin框架包含ForkJoinTask,ForkJoinWorkerThread,ForkJoinPool和若干ForkJoinTask的子类,它的核心在于分治和工作窍取,最大程度利用线程池中的工作线程,避免忙的忙死,饿的饿死.

ForkJoinTask可以理解为类线程但比线程轻量的实体,在ForkJoinPool中运行的少量ForkJoinWorkerThread可以持有大量的ForkJoinTask和它的子任务.ForkJoinTask同时也是一个轻量的Future,使用时应避免较长阻塞和io.

ForkJoinTask在JAVA8中应用广泛,但它是一个抽象类,它的子类派生了各种用途,如后续计划单独介绍的CountedCompleter,以及若干JAVA8中stream api定义的与并行流有关的各种操作(ops).

源码

首先看ForkJoinTask的签名.

public abstract class ForkJoinTask<V> implements Future<V>, Serializable 

从签名上看,ForkJoinTask实现了future,也可以序列化,但它不是一个Runnable或Callable.

ForkJoinTask虽然可以序列化,但它只对运行前和后敏感,对于执行过程中不敏感.

先来看task的运行字段:

//volatie修饰的任务状态值,由ForkJoinPool或工作线程修改.
volatile int status; 
static final int DONE_MASK = 0xf0000000;//用于屏蔽完成状态位. static final int NORMAL = 0xf0000000;//表示正常完成,是负值. static final int CANCELLED = 0xc0000000;//表示被取消,负值,且小于NORMAL static final int EXCEPTIONAL = 0x80000000;//异常完成,负值,且小于CANCELLED static final int SIGNAL = 0x00010000;//用于signal,必须不小于1<<16,默认为1<<16. static final int SMASK = 0x0000ffff;//后十六位的task标签 

很显然,DONE_MASK能够过滤掉所有非NORMAL,非CANCELLED,非EXCEPTIONAL的状态,字段的含义也很直白,后面的SIGNAL和SMASK还不明确,后面再看.

//标记当前task的completion状态,同时根据情况唤醒等待该task的线程.
private int setCompletion(int completion) { for (int s;;) { //开启一个循环,如果当前task的status已经是各种完成(小于0),则直接返回status,这个status可能是某一次循环前被其他线程完成. if ((s = status) < 0) return s; //尝试将原来的status设置为它与completion按位或的结果. if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { if ((s >>> 16) != 0) //此处体现了SIGNAL的标记作用,很明显,只要task完成(包含取消或异常),或completion传入的值不小于1<<16, //就可以起到唤醒其他线程的作用. synchronized (this) { notifyAll(); } //cas成功,返回参数中的completion. return completion; } } } 

前面用注释解释了这个方法的逻辑,显然该方法是阻塞的,如果传入的参数不能将status设置为负值会如何?

显然,可能会有至多一次的成功cas,并且若满足唤醒的条件,会尝试去唤醒线程,甚至可能因为为了唤醒其他线程而被阻塞在synchonized代码块外;也可能没有一次成功的cas,直到其他线程成功将status置为完成.

//final修饰,运行ForkJoinTask的核心方法.
final int doExec() {
    int s; boolean completed; //仅未完成的任务会运行,其他情况会忽略. if ((s = status) >= 0) { try { //调用exec completed = exec(); } catch (Throwable rex) { //发生异常,用setExceptionalCompletion设置结果 return setExceptionalCompletion(rex); } if (completed) //正常完成,调用前面说过的setCompletion,参数为normal,并将返回值作为结果s. s = setCompletion(NORMAL); } //返回s return s; } //记录异常并且在符合条件时传播异常行为 private int setExceptionalCompletion(Throwable ex) { //首先记录异常信息到结果 int s = recordExceptionalCompletion(ex); if ((s & DONE_MASK) == EXCEPTIONAL) //status去除非完成态标志位(只保留前4位),等于EXCEPTIONAL.内部传播异常 internalPropagateException(ex); return s; } //internalPropagateException方法是一个空方法,留给子类实现,可用于completer之间的异常传递 void internalPropagateException(Throwable ex) {} //记录异常完成 final int recordExceptionalCompletion(Throwable ex) { int s; if ((s = status) >= 0) { //只能是异常态的status可以记录. //hash值禁止重写,不使用子类的hashcode函数. int h = System.identityHashCode(this); final ReentrantLock lock = exceptionTableLock; //异常锁,加锁 lock.lock(); try { //抹除脏异常,后面叙述 expungeStaleExceptions(); //异常表数组.ExceptionNode后面叙述. ExceptionNode[] t = exceptionTable;//exceptionTable是一个全局的静态常量,后面叙述 //用hash值和数组长度进行与运算求一个初始的索引 int i = h & (t.length - 1); for (ExceptionNode e = t[i]; ; e = e.next) { //找到空的索引位,就创建一个新的ExceptionNode,保存this,异常对象并退出循环 if (e == null) { t[i] = new ExceptionNode(this, ex, t[i]);//(1) break; } if (e.get() == this) //已设置在相同的索引位置的链表中,退出循环.//2 break; //否则e指向t[i]的next,进入下个循环,直到发现判断包装this这个ForkJoinTask的ExceptionNode已经出现在t[i]这个链表并break(2), //或者直到e是null,意味着t[i]出发开始的链表并无包装this的ExceptionNode,则将构建一个新的ExceptionNode并置换t[i], //将原t[i]置为它的next(1).整个遍历判断和置换过程处在锁中进行. } } finally { lock.unlock(); } //记录成功,将当前task设置为异常完成. s = setCompletion(EXCEPTIONAL); } return s; } //exceptionTable声明 private static final ExceptionNode[] exceptionTable;//全局异常node表 private static final ReentrantLock exceptionTableLock;//上面用到的锁,就是一个普通的可重入锁. private static final ReferenceQueue<Object> exceptionTableRefQueue;//变量表引用队列,后面详述. private static final int EXCEPTION_MAP_CAPACITY = 32;//异常表的固定容量,不大,只有32而且是全局的. //初始化在一个静态代码块. static { exceptionTableLock = new ReentrantLock(); exceptionTableRefQueue = new ReferenceQueue<Object>(); exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];//容量 try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ForkJoinTask.class; STATUS = U.objectFieldOffset (k.getDeclaredField("status")); } catch (Exception e) { throw new Error(e); } } //先来看ExceptionNode内部类的实现 //签名,实现了一个ForkJoinTask的弱引用. static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> { final Throwable ex; ExceptionNode next; final long thrower; // use id not ref to avoid weak cycles final int hashCode; // store task hashCode before weak ref disappears ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) { super(task, exceptionTableRefQueue);//指向弱引用的构造函数,保存引用为task,队列为全局的exceptionTableRefQueue. this.ex = ex;//抛出的异常的引用 this.next = next;//数组中的ExceptionNode以链表形式存在,前面分析过,先入者为后入者的next this.thrower = Thread.currentThread().getId();//保存抛出异常的线程id(严格来说是创建了this的线程) this.hashCode = System.identityHashCode(task);//哈希码保存关联task的哈希值. } } //清除掉异常表中的脏数据,仅在持有全局锁时才可使用.前面看到在记录新的异常信息时要进行一次清除尝试 private static void expungeStaleExceptions() { //循环条件,全局exceptionTableRefQueue队列不为空,前面说过ExceptionNode是弱引用,当它被回收时会被放入此队列. for (Object x; (x = exceptionTableRefQueue.poll()) != null;) { //从队首依次取出元素. if (x instanceof ExceptionNode) { //计算在全局exceptionTable中的索引. int hashCode = ((ExceptionNode)x).hashCode; ExceptionNode[] t = exceptionTable; int i = hashCode & (t.length - 1); //取出node ExceptionNode e = t[i]; ExceptionNode pred = null; //不停遍历,直到e是null为止. while (e != null) { //e的next ExceptionNode next = e.next;//2 //x是队首出队的元素.它与e相等说明找到 if (e == x) { //e是一个链表的元素,pred表示它是否有前置元素 if (pred == null) //无前置元素,说明e在链表首部,直接将首部元素指向next即可. t[i] = next; else //有前置元素,说明循环过若干次,将当前e出链表 pred.next = next; //在链表中发现x即break掉内循环,继续从exceptionTableRefQueue的队首弹出新的元素. break; } //只要发现当前e不是x,准备下一次循环,pred指向e.e指向next,进行下一个元素的比较. pred = e; e = next; } } } } 

到此doExec(也是每个ForkJoinTask的执行核心过程)就此结束.

很明显,ForkJoinTask的doExec负责了核心的执行,它留下了exec方法给子类实现,而重点负责了后面出现异常情况的处理.处理的逻辑前面已论述,在产生异常时尝试将异常存放在全局的execptionTable中,存放的结构为数组+链表,按哈希值指定索引,每次存放新的异常时,顺便清理上一次已被gc回收的ExceptionNode.所有ForkJoinTask共享了一个exceptionTable,因此必然在有关的几个环节要进行及时的清理.除了刚刚论述的过程,还有如下的几处:

ForkJoinPool源码简单解析

前面论述了recordExceptionalCompletion,一共有四处使用了expungeStaleException,将已回收的ExceptionNode从引用队列中清除.

clearExceptionalCompletion在对一个ForkJoinTask重新初始化时使用,我们在前面提到序列化时说过,ForkJoinTask的序列化结果只保留了两种情况:运行前,运行结束.重新初始化一个ForkJoinTask,就要去除任何中间状态,包含自身产出的已被回收的异常node,而expungeStaleExceptions显然也顺便帮助其他task清除.

getThrowableException是查询task运行结果时调用,如一些get/join方法,很明显,记录这个异常的作用就在于返回给get/join,在这一块顺便清理已被回收的node,尤其是将自己运行时生成的node清除.

helpExpungeStaleExceptions是提供给ForkJoinPool在卸载worker时使用,顺便帮助清理全局异常表.

使用它们的方法稍后再论述,先来继续看ForkJoinTask的源码.

//内部等待任务完成,直到完成或超时.
final void internalWait(long timeout) { int s; //status小于0代表已完成,直接忽略wait. //未完成,则试着加上SIGNAL的标记,令完成任务的线程唤醒这个等待. if ((s = status) >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { //加锁,只有一个线程可以进入. synchronized (this) { //再次判断未完成.等待timeout,且忽略扰动异常. if (status >= 0) try { wait(timeout); } catch (InterruptedException ie) { } else //已完成则响醒其他等待者. notifyAll(); } } } 

internalWait方法逻辑很简单,首先判断是否未完成,满足未完成,则将标记位加上SIGNAL(可能已有别的线程做过),随后加锁double check status,还未完成则等待并释放锁,若发现已完成,或在后续被唤醒后发现已完成,则唤醒其他等待线程.通过notifyAll的方式避免了通知丢失.

同时,它的使用方法目前只有一个ForkJoinPool::awaitJoin,在该方法中使用循环的方式进行internalWait,满足了每次按截止时间或周期进行等待,同时也顺便解决了虚假唤醒.

继续看externalAwaitDone函数.它体现了ForkJoin框架的一个核心:外部帮助.

//外部线程等待一个common池中的任务完成.
private int externalAwaitDone() { int s = ((this instanceof CountedCompleter) ? //当前task是一个CountedCompleter,尝试使用common ForkJoinPool去外部帮助完成,并将完成状态返回. ForkJoinPool.common.externalHelpComplete( (CountedCompleter<?>)this, 0) : //当前task不是CountedCompleter,则调用common pool尝试外部弹出该任务并进行执行, //status赋值doExec函数的结果,若弹出失败(其他线程先行弹出)赋0. ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); if (s >= 0 && (s = status) >= 0) { //检查上一步的结果,即外部使用common池弹出并执行的结果(不是CountedCompleter的情况),或外部尝试帮助CountedCompleter完成的结果 //status大于0表示尝试帮助完成失败. //扰动标识,初值false boolean interrupted = false; do { //循环尝试,先给status标记SIGNAL标识,便于后续唤醒操作. if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) { try { //CAS成功,进同步块发现double check未完成,则等待. wait(0L); } catch (InterruptedException ie) { //若在等待过程中发生了扰动,不停止等待,标记扰动. interrupted = true; } } else //进同步块发现已完成,则唤醒所有等待线程. notifyAll(); } } } while ((s = status) >= 0);//循环条件,task未完成. if (interrupted) //循环结束,若循环中间曾有扰动,则中断当前线程. Thread.currentThread().interrupt(); } //返回status return s; } 

externalAwaitDone的逻辑不复杂,在当前task为ForkJoinPool.common的情况下可以在外部进行等待和尝试帮助完成.方法会首先根据ForkJoinTask的类型进行尝试帮助,并返回当前的status,若发现未完成,则进入下面的等待唤醒逻辑.该方法的调用者为非worker线程.

相似的方法:externalInterruptibleAwaitDone

private int externalInterruptibleAwaitDone() throws InterruptedException { int s; //不同于externalAwaitDone,入口处发现当前线程已中断,则立即抛出中断异常. if (Thread.interrupted()) throw new InterruptedException(); if ((s = status) >= 0 && (s = ((this instanceof CountedCompleter) ? ForkJoinPool.common.externalHelpComplete( (CountedCompleter<?>)this, 0) : ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0)) >= 0) { while ((s = status) >= 0) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) //wait时也不catch中断异常,发生即抛出. wait(0L); else notifyAll(); } } } } return s; } 

externalInterruptibleAwaitDone的逻辑与externalAwaitDone相似,只是对中断异常的态度为抛,后者为catch.

它们的使用点,externalAwaitDone为doJoin或doInvoke方法调用,externalInterruptibleAwaitDone为get方法调用,很明显,join操作不可扰动,get则可以扰动.

下面来看看doJoin和doInvoke

//join的核心方法
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; //已完成,返回status,未完成再尝试后续 return (s = status) < 0 ? s : //未完成,当前线程是ForkJoinWorkerThread,从该线程中取出workQueue,并尝试将 //当前task出队然后执行,执行的结果是完成则返回状态,否则使用当线程池所在的ForkJoinPool的awaitJoin方法等待. ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : //当前线程不是ForkJoinWorkerThread,调用前面说的externalAwaitDone方法. externalAwaitDone(); } //invoke的核心方法 private int doInvoke() { int s; Thread t; ForkJoinWorkerThread wt; //先尝试本线程执行,不成功才走后续流程 return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? //与上一个方法基本相同,但在当前线程是ForkJoinWorkerThread时不尝试将该task移除栈并执行,而是等 (wt = (ForkJoinWorkerThread)t).pool. awaitJoin(wt.workQueue, this, 0L) : externalAwaitDone(); } 

到此终于可以看一些公有对外方法了.有了前面的基础,再看get,join,invoke等方法非常简单.

//get方法还有get(long time)的变种.
public final V get() throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? //当前线程是ForkJoinWorkerThread则调用前面提过的doJoin方法. //否则调用前述externalInterruptibleAwaitDone doJoin() : externalInterruptibleAwaitDone(); Throwable ex; if ((s &= DONE_MASK) == CANCELLED) //异常处理,取消的任务,抛出CancellationException. throw new CancellationException(); if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) //异常处理,调用getThrowableException获取异常,封进ExecutionException. throw new ExecutionException(ex); //无异常处理,返回原始结果. return getRawResult(); } //getRawResult默认为一个抽象实现,在ForkJoinTask中,并未保存该结果的字段. public abstract V getRawResult(); //getThrowableException方法 private Throwable getThrowableException() { //不是异常标识,直接返回null,从方法名的字面意思看,要返回一个可抛出的异常. if ((status & DONE_MASK) != EXCEPTIONAL) return null; //系统哈希码来定位ExceptionNode int h = System.identityHashCode(this); ExceptionNode e; final ReentrantLock lock = exceptionTableLock; //加异常表全局锁 lock.lock(); try { //先清理已被回收的异常node,前面已述. expungeStaleExceptions(); ExceptionNode[] t = exceptionTable; e = t[h & (t.length - 1)]; //循环找出this匹配的异常node while (e != null && e.get() != this) e = e.next; } finally { lock.unlock(); } Throwable ex; //前面找不出异常node或异常node中存放的异常为null,则返回null if (e == null || (ex = e.ex) == null) return null; if (e.thrower != Thread.currentThread().getId()) { //不是当前线程抛出的异常. Class<? extends Throwable> ec = ex.getClass(); try { Constructor<?> noArgCtor = null;//该异常的无参构造器 Constructor<?>[] cs = ec.getConstructors();//该异常类公有构造器 for (int i = 0; i < cs.length; ++i) { Constructor<?> c = cs[i]; Class<?>[] ps = c.getParameterTypes(); if (ps.length == 0) //构建器参数列表长度0说明存在无参构造器,存放. noArgCtor = c; else if (ps.length == 1 && ps[0] == Throwable.class) { //发现有参构造器且参数长度1且第一个参数类型是Throwable,说明可以存放cause. //反射将前面取出的ex作为参数,反射调用该构造器创建一个要抛出的Throwable. Throwable wx = (Throwable)c.newInstance(ex); //反射失败,异常会被catch,返回ex,否则返回wx. return (wx == null) ? ex : wx; } } if (noArgCtor != null) { //在尝试了寻找有参无参构造器,并发现只存在无参构造器的情况,用无参构造器初始化异常. Throwable wx = (Throwable)(noArgCtor.newInstance()); if (wx != null) { //将ex设置为它的cause并返回它的实例. wx.initCause(ex); return wx; } } } catch (Exception ignore) { //此方法不可抛出异常,一定要成功返回. } } //有参无参均未成功,返回找到的异常. return ex; } //join公有方法 public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) //调用doJoin方法阻塞等待的结果不是NORMAL,说明有异常或取消.报告异常. reportException(s); //等于NORMAL,正常执行完毕,返回原始结果. return getRawResult(); } //报告异常,可在前一步判断执行status是否为异常态,然后获取并重抛异常. private void reportException(int s) { //参数s必须用DONE_MASK处理掉前4位以后的位. if (s == CANCELLED) //传入的状态码等于取消,抛出取消异常. throw new CancellationException(); if (s == EXCEPTIONAL) //使用前面的getThrowableException方法获取异常并重新抛出. rethrow(getThrowableException()); } //invoke公有方法. public final V invoke() { int s; //先尝试执行 if ((s = doInvoke() & DONE_MASK) != NORMAL) //doInvoke方法的结果status只保留完成态位表示非NORMAL,则报告异常. reportException(s); //正常完成,返回原始结果. return getRawResult(); } 

终于,读到此处的读者将关键的方法线串了起来,前述的所有内部方法,常量和变量与公有接口的关系已经明了.

很显然,ForkJoinTask是个抽象类,且它并未保存任务的完成结果,也不负责这个结果的处理,但声明并约束了返回结果的抽象方法getRawResult供子类实现.

因此,ForkJoinTask的自身关注任务的完成/异常/未完成,子类关注这个结果的处理.

每当获取到任务的执行状态时,ForkJoinTask可根据status来判断是否是异常/正常完成,并进入相应的处理逻辑,最终使用子类实现的方法完成一个闭环.

如果理解为将ForkJoinTask和子类的有关代码合并起来,在结果/完成状态/异常信息这一块,相当于同时有三个part在合作.

第一个part:status字段,它同时表示了未完成/正常完成/取消/异常完成等状态,也同时告诉有关等待线程是否要唤醒其他线程(每个线程等待前会设置SIGNAL),同时留出了后面16位对付其他情况.

第二个part:result,在ForkJoinTask见不到它,也没有相应的字段,子类也未必需要提供这个result字段,前面提到的CountedCompleter就没有提供这个result,它的getRawResult会固定返回null.但是CountedCompleter可以继承子类并实现这个result的保存与返回(道格大神在注释中举出了若干典型代码例子),在JAVA8中,stream api中的并行流也会保存每一步的计算结果,并对结果进行合并.

第三个part:异常.在ForkJoinTask中已经完成了所有异常处理流程和执行流程的定义,重点在于异常的存放,它是由ForkJoinTask的类变量进行存放的,结构为数组+链表,且元素利用了弱引用,借gc帮助清除掉已经被回收的ExceptionNode,显然在gc之前必须得到使用.而异常随时可以发生并进行record入列,但相应的能消费掉这个异常的只有相应的外部的get,join,invoke等方法或者内部扩展了exec()等方式,得到其他线程执行的task异常结果的情况.巧妙的是,只有外部调用者调用(get,invoke,join)时,这个异常信息才足够重要,需要rethrow出去并保存关键的堆栈信息;而内部线程在访问一些非自身执行的任务时,往往只需要status判断是否异常即可,在exec()中fork新任务的,也往往必须立即join这些新的子任务,这就保证了能够及时得到子任务中的异常堆栈(即使拿不到堆栈也知道它失败了).

经过前面的论述,ForkJoinTask的执行和异常处理已经基本论结,但是,一个ForkJoinTask在创建之后是如何运行的?显然,它不是一个Runnable,也不是Callable,不能直接submit或execute到普通的线程池.

临时切换到ForkJoinPool的代码,前面提到过,ForkJoinTask的官方定义就是可以运行在ForkJoinPool中的task.

//ForkJoinPool代码,submit一个ForkJoinTask到ForkJoinPool,并将该task自身返回.
//拿到返回的task,我们就可以进行前述的get方法了.
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
    if (task == null) throw new NullPointerException(); externalPush(task); return task; } //execute,不返回.类似普通线程池提交一个runnable的行为. public void execute(ForkJoinTask<?> task) { if (task == null) throw new NullPointerException(); externalPush(task); } 

显然,若要使用一个自建的ForkJoinPool,可以使用execute或submit函数提交入池,然后用前述的get方法和变种方法进行.这是一种运行task的方式.

前面论述过的invoke方法会先去先去尝试本地执行,然后才去等待,故我们自己new一个ForkJoinTask,一样可以通过invoke直接执行,这是第二种运行task的方式.

前面论述的join方法在某种情况下也是一种task的运行方式,在当前线程是ForkJoinWorkerThread时,会去尝试将task出队并doExec,也就是会先用本线程执行一次,不成功才干等,非ForkJoinWorkerThread则直接干等了.显然我们可以自己构建一个ForkJoinWorkerThread并去join,这时会将任务出队并执行(但存在一个问题:什么时候入队).且出队后若未执行成功,则awaitJoin(参考ForkJoinPool::awaitJoin),此时因任务已出队,不会被窃取或帮助(在awaitJoin中会有helpStealer,但其实任务是当前线程自己"偷走"了),似乎完全要靠自己了.但并不表示ForkJoinTask子类无法获取这个已出队的任务,比如CountedCompleter使用时,可以在compute中新生成的Completer时,将源CountedCompleter(ForkJoinTask的子类)作为新生成的CountedCountedCompleter的completer(该子类中的一个字段),这样,若有一个ForkJoinWorkerThread窃取了这个新生成的CountedCompleter,可以通过completer链表找到先前被出队的CountedCompleter(ForkJoinTask).关于CountedCompleter单独文章详述.

除此之外呢?包含前面提到的使用join操作不是ForkJoinWorkerThread调用的情况,不使用ForkJoinPool的submit execute入池,如何能让一个ForkJoinTask在将来执行?我们来看后面的方法.

//fork方法,将当前任务入池.
public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        //如果当前线程是ForkJoinWorkerThread,将任务压入该线程的任务队列. ((ForkJoinWorkerThread)t).workQueue.push(this); else //否则调用common池的externalPush方法入队. ForkJoinPool.common.externalPush(this); return this; }

显然,我们还可以通过对一个ForkJoinTask进行fork方法入池,入哪个池完全取决于当前线程的类型.这是第四种让任务能被运行的方式.

同样,我们也看到了第五种方式,ForkJoinPool.common其实就是一个常量保存的ForkJoinPool,它能够调用externalPush,我们自然也可以直接new一个ForkJoinPool,然后将当前task进行externalPush,字面意思外部压入.这种办法,非ForkJoinWorkerThread也能将任务提交到非common的ForkJoinPool.

从名字来看,ForkJoinTask似乎已经说明了一切,按照官方的注释也是如此.对一个task,先Fork压队,再Join等待执行结果,这是一个ForkJoinTask的执行周期闭环(但不要简单理解为生命周期,前面提到过,任务可以被重新初始化,而且重新初始化时还会清空ExceptionNode数组上的已回收成员).

到此为止,ForkJoinTask的核心函数和api已经基本了然,其它同类型的方法以及周边的方法均不难理解,如invokeAll的各种变种.下面来看一些"周边"类型的函数.有前述的基础,它们很好理解.

//取消一个任务的执行,直接将status设置成CANCELLED,设置后判断该status 是否为CANCELLED,是则true否则false.
public boolean cancel(boolean mayInterruptIfRunning) { return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED; } //判断是否完成,status小于0代表正常完成/异常完成/取消,很好理解. public final boolean isDone() { return status < 0; } //判断当前任务是否取消. public final boolean isCancelled() { //status前4位 return (status & DONE_MASK) == CANCELLED; } public final boolean isCompletedAbnormally() { //是否为异常完成,前面说过,CANCELLED和EXCEPTIONAL均小于NORMAL return status < NORMAL; } //是否正常完成. public final boolean isCompletedNormally() { //完成态位等于NORMAL return (status & DONE_MASK) == NORMAL; } //获取异常. public final Throwable getException() { int s = status & DONE_MASK; //当为正常完成或未完成时,返回null. return ((s >= NORMAL) ? null : //是取消时,新建一个取消异常. (s == CANCELLED) ? new CancellationException() : //不是取消,参考前面提到的getThrowableException. getThrowableException()); } //使用异常完成任务. public void completeExceptionally(Throwable ex) { //参考前述的setExceptionalCompletion, //ex已经是运行时异常或者Error,直接使用ex完成,若是受检异常,包装成运行时异常. setExceptionalCompletion((ex instanceof RuntimeException) || (ex instanceof Error) ? ex : new RuntimeException(ex)); } //使用value完成任务. public void complete(V value) { try { //设置原始结果,它是一个空方法.前面说过ForkJoinTask没有维护result之类的结果字段,子类可自行发挥. setRawResult(value); } catch (Throwable rex) { //前述步骤出现异常,就用异常方式完成. setExceptionalCompletion(rex); return; } //前面的结果执行完,标记当前为完成. setCompletion(NORMAL); } //安静完成任务.直接用NORMAL setCompletion,没什么好说的. public final void quietlyComplete() { setCompletion(NORMAL); } //安静join,它不会返回result也不会抛出异常.处理集合任务时,如果需要所有任务都被执行而不是一个执行出错(取消)其他也跟着出错的情况下, //很明显适用,这不同于invokeAll,静态方法invokeAll或invoke(ForkJoinTask,ForkJoinTask)会在任何一个任务出现异常后取消执行并抛出. public final void quietlyJoin() { doJoin(); } //安静执行一次,不返回结果不抛出异常,没什么好说的. public final void quietlyInvoke() { doInvoke(); } //重新初台化当前task public void reinitialize() { if ((status & DONE_MASK) == EXCEPTIONAL) //如果当前任务是异常完成的,清除异常.该方法参考前面的论述. clearExceptionalCompletion(); else //否则重置status为0. status = 0; } //反fork. public boolean tryUnfork() { Thread t; //当前线程是ForkJoinWorkerThread,从它的队列尝试移除. return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : //当前线程不是ForkJoinWorkerThread,用common池外部移除. ForkJoinPool.common.tryExternalUnpush(this)); }

上面是一些简单的周边方法,大多并不需要再论述了,unfork方法很明显在某些场景下不会成功,显然,当一个任务刚刚入队并未进行后续操作时,很可能成功.按前面所述,当对一个任务进行join时,可能会成功的弹出当前任务并执行,此时不可能再次弹出;当一个任务被其他线程窃取或被它本身执行的也不会弹出.

再来看一些老朋友,在前面的文章"CompletableFuture和响应式编程"一文中,作者曾着重强调过它将每个要执行的动作进行压栈(未能立即执行的情况),而栈中的元素Completion即是ForkJoinTask的子类,而标记该Completion是否被claim的方法和周边方法如下:

//获取ForkJoinTask的标记,返回结果为short型
public final short getForkJoinTaskTag() { //status的后16位 return (short)status; } //原子设置任务的标记位. public final short setForkJoinTaskTag(short tag) { for (int s;;) { //不停循环地尝试将status的后16位设置为tag. if (U.compareAndSwapInt(this, STATUS, s = status, //替换的结果,前16位为原status的前16位,后16位为tag. (s & ~SMASK) | (tag & SMASK))) //返回被换掉的status的后16位. return (short)s; } } //循环尝试原子设置标记位为tag,前提是原来的标记位等于e,成功true失败false public final boolean compareAndSetForkJoinTaskTag(short e, short tag) { for (int s;;) { if ((short)(s = status) != e) //如果某一次循环的原标记位不是e,则返回false. return false; //同上个方法 if (U.compareAndSwapInt(this, STATUS, s, (s & ~SMASK) | (tag & SMASK))) return true; } }

还记得CompletableFuture在异步执行Completion时要先claim吗?claim方法中,会尝试设置这个标记位.这是截止jdk8中CompletableFuture使用到ForkJoinTask的功能.

目前来看,在CompletableFuture的内部实现Completion还没有使用到ForkJoinTask的其他属性,比如放入一个ForkJoinPool执行(没有任何前面总结的调用,比如用ForkJoinPool的push,execute,submit等,也没有fork到common池).但是很明显,道格大神令它继承自ForkJoinTask不可能纯粹只为了使用区区一个标记位,试想一下,在如此友好支持响应式编程的CompletableFuture中传入的每一个action都可以生成若干新的action,那么CompletableFuture负责将这些action封装成Completion放入ForkJoinPool执行,将最大化利用到ForkJoin框架的工作窃取和外部帮助的功效,强力结合分治思想,这将是多么优雅的设计.或者在jdk9-12中已经出现了相应的Completion实现(尽管作者写过JAVA9-12,遗憾的是也没有去翻它们的源码).

另外,尽管Completion的众多子类也没有result之类的表示结果的字段,但它的一些子类通过封装,实际上间接地将这个Completion所引用的dep的result作为了自己的"result",当然,getRawResult依旧是null,但是理念却是相通的.

以上是ForkJoinTask的部分核心源码,除了上述的源码外,还有一些同属于ForkJoinTask的核心源码部分,比如其他的public方法(参考join fork invoke 即可),一些利用ForkJoinPool的实现,要深入了解ForkJoinPool才能了解的方法,一些不太难的静态方法等,这些没有必要论述了.

除了核心源码外,ForkJoinTask也提供了对Runnable,Callable的适配器实现,这块很好理解,简单看一看.

//对Runnable的实现,如果在ForkJoinPool中提交一个runnable,会用它封装成ForkJoinTask
static final class AdaptedRunnable<T> extends ForkJoinTask<T> implements RunnableFuture<T> { final Runnable runnable; T result; AdaptedRunnable(Runnable runnable, T result) { //不能没有runnable if (runnable == null) throw new NullPointerException(); this.runnable = runnable; //对runnable做适配器时,可以提交将结果传入,并设置为当前ForkJoinTask子类的result. //前面说过,ForkJoinTask不以result作为完成标记,判断一个任务是否完成或异常,使用status足以, //返回的结果才使用result. this.result = result; } public final T getRawResult() { return result; } public final void setRawResult(T v) { result = v; } //前面说过提交入池的ForkJoinTask最终会运行doExec,而它会调用exec,此处会调用run. public final boolean exec() { runnable.run(); return true; } public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L;//序列化用 } //无结果的runnable适配器 static final class AdaptedRunnableAction extends ForkJoinTask<Void> implements RunnableFuture<Void> { final Runnable runnable; AdaptedRunnableAction(Runnable runnable) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; } //区别就是result固定为null,也不能set public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { runnable.run(); return true; } public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L; } //对runnable的适配器,但强制池中的工作线程在执行任务发现异常时抛出 static final class RunnableExecuteAction extends ForkJoinTask<Void> { final Runnable runnable; RunnableExecuteAction(Runnable runnable) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; } //默认null结果,set也是空实现 public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { runnable.run(); return true; } void internalPropagateException(Throwable ex) { //前面说过doExec会被执行,它会调exec并catch,在catch块中设置当前任务为异常完成态, //然后调用internalPropagateException方法,而在ForkJoinTask中默认为空实现. //此处将异常重新抛出,将造成worker线程抛出异常. rethrow(ex); } private static final long serialVersionUID = 5232453952276885070L; } //对callable的适配器,当将callable提交至ForkJoinPool时使用. static final class AdaptedCallable<T> extends ForkJoinTask<T> implements RunnableFuture<T> { final Callable<? extends T> callable; T result; AdaptedCallable(Callable<? extends T> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; } //字段中有一个result,直接使用它返回. public final T getRawResult() { return result; } //result可外部直接设置. public final void setRawResult(T v) { result = v; } public final boolean exec() { try { //默认的result用call函数设置. result = callable.call(); return true; } catch (Error err) { //catch住Error,抛出 throw err; } catch (RuntimeException rex) { //catch住运行时异常,抛出 throw rex; } catch (Exception ex) { //catch住受检异常,包装成运行时异常抛出. throw new RuntimeException(ex); } } //run方法一样只是调用invoke,进而调用doExec. public final void run() { invoke(); } private static final long serialVersionUID = 2838392045355241008L; } //runnable生成适配器的工具方法 public static ForkJoinTask<?> adapt(Runnable runnable) { return new AdaptedRunnableAction(runnable); } //指定结果设置runnable的适配器工具方法 public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) { return new AdaptedRunnable<T>(runnable, result); } //对callable生成适配器的方法. public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) { return new AdaptedCallable<T>(callable); } 

以上的代码都不复杂,只要熟悉了ForkJoinTask的本身代码结构,对于这一块了解非常容易,这也间接说明了ForkJoinPool中是如何处理Runnable和Callable的(因为ForkJoinPool本身也是一种线程池,可以接受提交Callable和Runnable).

将runnable提交到pool时,可以指定result,也可以不指定,也可以用submit或execute方法区分异常处理行为,ForkJoinPool会自行选择相应的适配器.

将callable 提交到pool时,pool会选择对callable的适配器,它的结果将为task的结果,它的异常将为task的异常.

到此为止,ForkJoinTask的源码分析完成.

后语

本文详细分析了ForkJoinTask的源码,并解释了前文CompletableFuture中Completion与它的关联,以及分析了Completion继承自ForkJoinTask目前已带来的功能利用(tag)和将来可能增加的功用(一个Completion产生若干多个Completion并在ForkJoinPool中运行,还支持工作窃取).

同时本文也对ForkJoinPool和ForkJoinWorkerThread,以及CountedCompleter和Stream api中的并行流进行了略微的描述.

在文章的最后,或许有一些新手读者会好奇,我们究竟什么时候会使用ForkJoinTask?

首先,如果你在项目中大肆使用了流式计算,并使用了并行流,那么你已经在使用了.

前面提过,官方解释ForkJoinTask可以视作比线程轻量许多的实体,也是轻量的Future.结合在源码中时不时出来秀存在感的ForkJoinWorkerThread,显然它就是据说比普通线程轻量一些的线程,在前面的源码中可以看出,它维护了一组任务的队列,每个线程负责完成队列中的任务,也可以偷其他线程的任务,甚至池外的线程都可以时不时地来个join,顺便帮助出队执行任务.

显然,对于重计算,轻io,轻阻塞的任务,适合使用ForkJoinPool,也就使用了ForkJoinTask,你不会认为它可以提交runnable和callable,就可以不用ForkJoinTask了吧?前面的适配器ForkJoinPool在这种情况下必用的,可以去翻相应的源码.

本章没有去详述CountedCompleter,但前面论述时说过,你可以在exec()中将一个计算复杂的任务拆解为小的子任务,然后将子任务入池执行,父任务合并子任务的结果.这种分治的算法此前基本是在单线程模式下运行,使用ForkJoinTask,则可以将这种计算交给一个ForkJoinPool中的所有线程并行执行.

转自: https://segmentfault.com/a/1190000019549838?utm_source=tag-newest

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Android So动态加载 优雅实现与原理分析
背景:漫品Android客户端集成适配转换功能(基于目标识别(So库35M)和人脸识别库(5M)),导致apk体积50M左右,为优化客户端体验,决定实现So文件动态加载.!(https://oscimg.oschina.net/oscnet/00d1ff90e4b34869664fef59e3ec3fdd20b.png)点击上方“蓝字”关注我
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
35岁是技术人的天花板吗?
35岁是技术人的天花板吗?我非常不认同“35岁现象”,人类没有那么脆弱,人类的智力不会说是35岁之后就停止发展,更不是说35岁之后就没有机会了。马云35岁还在教书,任正非35岁还在工厂上班。为什么技术人员到35岁就应该退役了呢?所以35岁根本就不是一个问题,我今年已经37岁了,我发现我才刚刚找到自己的节奏,刚刚上路。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这