一.前言
之前在整理线程使用的时候,无意间看到了ForkJoinPool,在JDK1.7时(新)加入的,就学习了如何使用;
二. ForkJoinPool 使用
2.1 ForkJoinPool的使用姿势
ForkJoinPool采用工作窃取算法,将一个大任务根据阈值分割成很多个子任务,最后根据场景是否要合并子任务运算结果;
根据是否需要合并子任务运算结果,任务需要继承抽象类**RecursiveAction,RecursiveTask
我们只需要实现抽象方法 protected Void compute(),定义子任务拆分规则和任务算法就可以了;
我们查看类继承图可知其属于Future的一个实现,Future的使用在之前有过介绍;
有2种使用方案:
- fork : 递归划分子任务,无需合并子任务结果;
- fork & join : 递归划分子任务,最后合并子任务计算结果;
2.2 Only Fork
不需要合并子任务运算结果的场景;
下面的模拟场景是将集合sender中的元素发送到receiver中;
package com.river.thread;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class ForkJoinPoolTest {
private final static List<Integer> sender = new ArrayList<Integer>(21000000);
private final static List<Integer> receiver = new ArrayList<>(21000000);
private final static List<Integer> receiver2 = new ArrayList<>(21000000);
private final static AtomicInteger i = new AtomicInteger(0);
static {
log.info("prepare data");
while (i.get() < 21000000) {
sender.add(i.get());
i.incrementAndGet();
}
log.info("prepare over");
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
SendTask sendTask = new SendTask(0, 210000, sender);
log.info("Task Start !");
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
forkJoinPool.submit(sendTask);
forkJoinPool.awaitTermination(10, TimeUnit.SECONDS);
forkJoinPool.shutdown();
stopWatch.stop();
log.info("sender.size -> {}", sender.size());
log.info("receiver.size -> {}", receiver.size());
log.info("TotalTimeMillis1 -> "+stopWatch.getTotalTimeMillis());
}
@Slf4j
@AllArgsConstructor
public static class SendTask extends RecursiveTask<Void> {
//定义递归子任务的阈值
private final static int preSize = 100;
private int start;
private int end;
private List<Integer> tempList;
private final static AtomicInteger taskId = new AtomicInteger(0);
/**
* The main computation performed by this task.
*/
@Override
protected Void compute() {
if (end - start < preSize) {
//log.info("add start {} to end {}", start, end);
for (int i = start; i < end; i++) {
add(this.tempList.get(i));
}
} else {
int middle = (start + end) / 2;
RecursiveTask sendTaskLeft = new SendTask(start, middle, this.tempList);
RecursiveTask sendTaskRight = new SendTask(middle, end, this.tempList);
SendTask.invokeAll(sendTaskLeft, sendTaskRight);
}
return null;
}
//防止并发,list.add()方法并发插入,因为在第一次没有在add方法做同步限制,导致并发,找个好久问题,如果发短信什么的业务操作不需要做同步处理
public void add(int i){
synchronized (SendTask.class) {
receiver.add(i);
}
}
}
}
我们查看日志:
2018-09-15 16:49:41.383 myAppName [main] INFO com.river.thread.ForkJoinPoolTest - prepare data
2018-09-15 16:49:46.986 myAppName [main] INFO com.river.thread.ForkJoinPoolTest - prepare over
2018-09-15 16:49:46.987 myAppName [main] INFO com.river.thread.ForkJoinPoolTest - Task Start !
2018-09-15 16:49:47.106 myAppName [main] INFO com.river.thread.ForkJoinPoolTest - sender.size -> 21000000
2018-09-15 16:49:47.108 myAppName [main] INFO com.river.thread.ForkJoinPoolTest - receiver.size -> 2100000
2018-09-15 16:49:47.108 myAppName [main] INFO com.river.thread.ForkJoinPoolTest - TotalTimeMillis1 -> 119
我们在查看线程情况,有如下4条线程在执行任务,并且我的电脑就是4核的;
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
main
在这可以看到,与我们之前使用多线程不同的是,这里将main也作为执行任务线程之一,
2.3 Fork & Join
package com.river.thread;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class ForkJoinPoolTest {
private final static List<Integer> sender = new ArrayList<Integer>(21000000);
private final static List<Integer> receiver = new ArrayList<>(21000000);
private final static List<Integer> receiver2 = new ArrayList<>(21000000);
private final static AtomicInteger i = new AtomicInteger(0);
static {
log.info("prepare data");
while (i.get() < 21000000) {
sender.add(i.get());
i.incrementAndGet();
}
log.info("prepare over");
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
int count = 5000000;
SumTask sumTask = new SumTask(0, count);
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
ForkJoinTask<Integer> submit = forkJoinPool.submit(sumTask);
System.out.println(submit.get());
int s = 0;
for (int i = 0;i<=count;i++) {
s +=i;
}
System.out.println(s);
}
@Slf4j
@AllArgsConstructor
public static class SumTask extends RecursiveTask<Integer>{
private final static int threshold = 5000;
private int start;
private int end;
/**
* The main computation performed by this task.
*
* @return the result of the computation
*/
@Override
protected Integer compute() {
int sum = 0;
if (end - start < threshold){
for (int i = start; i< end; i++){
sum +=i;
}
}else {
int middle = (start + end) / 2;
SumTask sumTask = new SumTask(start, middle);
SumTask sumTask1 = new SumTask(middle, end);
SumTask.invokeAll(sumTask, sumTask1);
sum = sumTask.join() + sumTask1.join();
}
return sum;
}
}
}
三.相关使用
在JDK8中lamdba有个stream操作parallelStream,底层也是使用ForkJoinPool实现的;
我们可以通过Executors.newWorkStealingPool(int parallelism)快速创建ForkJoinPool线程池,无参默认使用CPU数量的线程数执行任务;