Java并发编程(五) ForkJoinPool的使用

Wesley13
• 阅读 587

一.前言

之前在整理线程使用的时候,无意间看到了ForkJoinPool,在JDK1.7时(新)加入的,就学习了如何使用;

二. ForkJoinPool 使用

2.1 ForkJoinPool的使用姿势

ForkJoinPool采用工作窃取算法,将一个大任务根据阈值分割成很多个子任务,最后根据场景是否要合并子任务运算结果;

根据是否需要合并子任务运算结果,任务需要继承抽象类**RecursiveAction,RecursiveTask,**后者为需要合并子任务结果,泛型为结果类型; 

我们只需要实现抽象方法 protected Void compute(),定义子任务拆分规则和任务算法就可以了;

                                                                           Java并发编程(五) ForkJoinPool的使用

我们查看类继承图可知其属于Future的一个实现,Future的使用在之前有过介绍;

有2种使用方案:

  • fork : 递归划分子任务,无需合并子任务结果;
  • fork & join : 递归划分子任务,最后合并子任务计算结果;

2.2 Only Fork

不需要合并子任务运算结果的场景;

下面的模拟场景是将集合sender中的元素发送到receiver中;

package com.river.thread;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class ForkJoinPoolTest {

    private final static List<Integer> sender = new ArrayList<Integer>(21000000);

    private final static List<Integer> receiver = new ArrayList<>(21000000);
    private final static List<Integer> receiver2 = new ArrayList<>(21000000);

    private final static AtomicInteger i = new AtomicInteger(0);

    static {
        log.info("prepare data");
        while (i.get() < 21000000) {
            sender.add(i.get());
            i.incrementAndGet();
        }
        log.info("prepare over");
    }


    public static void main(String[] args) throws InterruptedException, ExecutionException {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        SendTask sendTask = new SendTask(0, 210000, sender);
        log.info("Task Start !");
        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        forkJoinPool.submit(sendTask);
        forkJoinPool.awaitTermination(10, TimeUnit.SECONDS);
        forkJoinPool.shutdown();
        stopWatch.stop();
        log.info("sender.size -> {}", sender.size());
        log.info("receiver.size -> {}", receiver.size());
        log.info("TotalTimeMillis1 -> "+stopWatch.getTotalTimeMillis());
       
    }

    @Slf4j
    @AllArgsConstructor
    public static class SendTask extends RecursiveTask<Void> {
        //定义递归子任务的阈值
        private final static int preSize = 100;

        private int start;

        private int end;

        private List<Integer> tempList;

        private final static AtomicInteger taskId = new AtomicInteger(0);

        /**
         * The main computation performed by this task.
         */
        @Override
        protected Void compute() {
            if (end - start < preSize) {
                //log.info("add start {} to end {}", start, end);
                for (int i = start; i < end; i++) {
                    add(this.tempList.get(i));
                }
            } else {
                int middle = (start + end) / 2;
                RecursiveTask sendTaskLeft = new SendTask(start, middle, this.tempList);
                RecursiveTask sendTaskRight = new SendTask(middle, end, this.tempList);
               
                SendTask.invokeAll(sendTaskLeft, sendTaskRight);
            }
            return null;
        }
           
        //防止并发,list.add()方法并发插入,因为在第一次没有在add方法做同步限制,导致并发,找个好久问题,如果发短信什么的业务操作不需要做同步处理
        public void add(int i){
            synchronized (SendTask.class) {
                 receiver.add(i);
            }
        }

    }
}

我们查看日志:

2018-09-15 16:49:41.383 myAppName [main] INFO  com.river.thread.ForkJoinPoolTest - prepare data
2018-09-15 16:49:46.986 myAppName [main] INFO  com.river.thread.ForkJoinPoolTest - prepare over
2018-09-15 16:49:46.987 myAppName [main] INFO  com.river.thread.ForkJoinPoolTest - Task Start !
2018-09-15 16:49:47.106 myAppName [main] INFO  com.river.thread.ForkJoinPoolTest - sender.size -> 21000000
2018-09-15 16:49:47.108 myAppName [main] INFO  com.river.thread.ForkJoinPoolTest - receiver.size -> 2100000
2018-09-15 16:49:47.108 myAppName [main] INFO  com.river.thread.ForkJoinPoolTest - TotalTimeMillis1 -> 119

我们在查看线程情况,有如下4条线程在执行任务,并且我的电脑就是4核的;

ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
main

在这可以看到,与我们之前使用多线程不同的是,这里将main也作为执行任务线程之一,

2.3 Fork & Join

package com.river.thread;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class ForkJoinPoolTest {

    private final static List<Integer> sender = new ArrayList<Integer>(21000000);

    private final static List<Integer> receiver = new ArrayList<>(21000000);
    private final static List<Integer> receiver2 = new ArrayList<>(21000000);

    private final static AtomicInteger i = new AtomicInteger(0);

    static {
        log.info("prepare data");
        while (i.get() < 21000000) {
            sender.add(i.get());
            i.incrementAndGet();
        }
        log.info("prepare over");
    }


    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int count  = 5000000;
        SumTask sumTask = new SumTask(0, count);
        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        ForkJoinTask<Integer> submit = forkJoinPool.submit(sumTask);
        System.out.println(submit.get());
        int s = 0;
        for (int i = 0;i<=count;i++) {
            s +=i;
        }
        System.out.println(s);
    }

    @Slf4j
    @AllArgsConstructor
    public static class SumTask extends RecursiveTask<Integer>{

        private final static int threshold = 5000;

        private int start;

        private int end;


        /**
         * The main computation performed by this task.
         *
         * @return the result of the computation
         */
        @Override
        protected Integer compute() {
            int sum = 0;
            if (end - start < threshold){
                for (int i = start; i< end; i++){
                    sum +=i;
                }
            }else {
                int middle = (start + end) / 2;
                SumTask sumTask = new SumTask(start, middle);
                SumTask sumTask1 = new SumTask(middle, end);
                SumTask.invokeAll(sumTask, sumTask1);
                sum = sumTask.join() + sumTask1.join();
            }
            return sum;
        }
    }
}

三.相关使用

在JDK8中lamdba有个stream操作parallelStream,底层也是使用ForkJoinPool实现的;

我们可以通过Executors.newWorkStealingPool(int parallelism)快速创建ForkJoinPool线程池,无参默认使用CPU数量的线程数执行任务;

点赞
收藏
评论区
推荐文章
待兔 待兔
3个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Wesley13 Wesley13
3年前
java多线程之ForkJoinPool
转https://www.cnblogs.com/lixuwu/p/7979480.html(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fwww.cnblogs.com%2Flixuwu%2Fp%2F7979480.html)阅读目录使用(http
Wesley13 Wesley13
3年前
java 线程池入门
一简介线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的。在jdk1.5之后这一情况有了很大的改观。Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用。为我们在开发中处理线程的问题提供了非常大的帮助。二:线程池
Stella981 Stella981
3年前
Executors相关的类(线程池)
一、概述Java是天生就支持并发的语言,支持并发意味着多线程,线程的频繁创建在高并发及大数据量是非常消耗资源的,因为java提供了线程池。在jdk1.5以前的版本中,线程池的使用是及其简陋的,但是在JDK1.5后,有了很大的改善。JDK1.5之后加入了java.util.concurrent包,java.util.concurrent包的加入给
Wesley13 Wesley13
3年前
Java并发包线程池之ForkJoinPool即ForkJoin框架(二)
前言前面介绍了ForkJoinPool相关的两个类ForkJoinTask、ForkJoinWorkerThread,现在开始了解ForkJoinPool。ForkJoinPool也是实现了ExecutorService的线程池。但ForkJoinPool不同于其他类型的ExecutorService,主要是因为它使用了窃取工作机制:池中的所有线程
Stella981 Stella981
3年前
ForkJoin 框架
packagecom.wkx.test.forkJoin.containValue;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.ForkJoinPool;importjava.util.concurre
Stella981 Stella981
3年前
ForkJoinPool源码简单解析
ForkJoin框架之ForkJoinTask(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fsegmentfault.com%2Fa%2F1190000019549838)
Wesley13 Wesley13
3年前
2万字Java并发编程面试题整理(含答案,建议收藏)
Java并发编程1、在java中守护线程和本地线程区别?2、线程与进程的区别?3、什么是多线程中的上下文切换?4、死锁与活锁的区别,死锁与饥饿的区别?5、Java中用到的线程调度算法是什么?6、什么是线程组,为什么在Java中不推荐使用?7、为什么使用Executor框架?8、在Java
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
javalover123 javalover123
1年前
Java并行流指北
Java并行流,方便了并发操作,但是不注意可能会导致问题。如最大线程数,怎么控制并发数,类加载器,线程上下文变化,ForkJoinPool的execute、submit、invoke方法的区别等。