Java并发系列8

Wesley13
• 阅读 569

前面讲的同步并发工具有些比较简单,所以篇幅也比较短,今天要讲的线程池非常重要,所以会是一个大章哦。已经预见留言区如下:

“太长不看...”
“看到ThreadPoolExecutor构造已睡着”
“精力饱满而来,昏昏欲睡而去...”

学习的确需要投入精力,尤其涉及到一些细节,对别人能做到不明觉厉,为何不自己一览山上风景呢?

一、何为线程池

如果你接触过对线池,比如数据库连接池,他们要解决的问题是:对象的启动耗费资源比较多,最好能做到只启动一次,然后重复使用。我们平常用到static final去修饰常量,也是这个意思。
对于线程来说,启动线程相对来说耗费时间会久一点,而且线程开太多,又会耗费内存,导致GC压力,所以我们需要有控制和管理的手段。
线程池顾名思义就是一个池子,使用了它之后,创建线程变成了从池子获得线程;关闭线程变成了把线程归还给池子。

二、先来讲讲ExecutorsExecutorService

JDK中自然提供了一套线程池的实现,就是ThreadPoolExecutor
为了更简单的使用ThreadPoolExecutor,JDK提供了线程池工厂类Executors,我们来看看他都与那些工厂方法:

  • newFixedThreadPool(int nThreads) 构建一个拥有固定线程数量的线程池
  • newSingleThreadExecutor() 构建一个只拥有一个线程的线程池
  • newCachedThreadPool() 构建一个弹性的线程池,需要多少就有多少
  • newSingleThreadScheduledExecutor() 构建一个只拥有一个线程的计划任务线程池
  • newScheduledThreadPool(int corePoolSize) 构建一个弹性的计划任务线程池

我们先来看一个例子:

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = executorService.submit(()->{return 12;});
        System.out.println(future.get());
        executorService.shutdown();

我们看到了几个新的类,Executors的工厂类方法会返回一个ExecutorService,实际上他是ThreadPoolExecutor的父接口。
ExecutorService提供了submit()方法,把一个RunnableCallback提交到线程池去执行,这里的入参是CallbackCallback是一个回调接口,没有入参但有返回值。submit()方法提交了一个任务之后,会返回一个Future用来表示异步计算的结果,这里也就是获得Callback所返回的结果。
那么如果submit()入参是Runnable呢,这样的话Future一般只会获取到null
ExecutorService是继承自Executor接口,有时你不需要用Future来获取异步计算的结果,只是想从线程池取出线程来执行一些任务,那么可以:

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(()->{});
        executorService.shutdown();

需要注意的是线程池创建之后是不会自动关闭的,需要手动调用shutdown()方法,ExecutorService还有一个shutdownNow()方法,表示立即结束,而不是在所有线程工作完成后优雅的结束。

1、ScheduledExecutorService

在上面我们看到Executors的工厂方法中有两个会返回ScheduledExecutorService,分别是newSingleThreadScheduledExecutor()newScheduledThreadPool(int corePoolSize),这个跟计划任务有关了,类似Linux的at命令。
主要有两个方法来对任务进行周期性的调度:

  • scheduleAtFixedRate() 以上个任务的执行时间为起点,之后的period时间,调度下一次任务
  • scheduleWithFixedDelay() 以上个任务的结束时间为起点,经过delay时间进行任务调度

三、重头戏之ThreadPoolExecutor

Executors的工厂方法比较合适初学者使用,简单直接。如果你想做一个高玩,那么就不得不去探索一下核心线程池的内部实现了。
来看一下ExecutornewFixedThreadPool()方法:

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

会发现他其实是返回了一个ThreadPoolExecutor对象,可以确定ThreadPoolExecutor就是线程池的实现类了,那ThreadPoolExecutor构造函数的这几个参数都是什么意思呢? 看一下ThreadPoolExecutor最丰富的构造,其他构造都是调用这个的:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

我们一个个来分析一下,会发现他的完善和强大:

  • int corePoolSize 指定了线程池核心(最少)线程数量
  • int maximumPoolSize 指定了线程池最大线程数量
  • long keepAliveTime 指定了线程存活时间,也就是当有线程被归还的时候,他的存活时间,毕竟线程一直运行着也是耗费资源
  • TimeUnit unit 线程存活时间单位
  • BlockingQueue<Runnable> workQueue 任务队列,保存被提交但未被执行的任务。如果线程池已满且所有线程都在执行任务,那么后来提交的任务就会暂时保存在这个队列中
  • ThreadFactory threadFactory 创建线程时用到的线程工厂类
  • RejectedExecutionHandler handler 拒绝策略。当任务队列workQueue也满了的时候,再有任务提交到线程池,要通过什么把他拒绝掉

一下看到这么多,还是挺吓人的。其实前面几个参数还比较通俗易懂,比较难理解的在于后三个参数,我们来一一拆解一下。

1、workQueue

如果对BlockingQueue不熟悉,请参考https://my.oschina.net/lizaizhong/blog/1840206
这个workQueue有以下几种选择:

  • SynchronousQueue 这是一个比较特殊的队列,本身没有容量,来一个就得消费一个。可以看到Executors.newCachedThreadPool()就是使用的这个队列,因为newCachedThreadPool()中的maximumPoolSize为无限大
  • ArrayBlockingQueue 有界队列
  • LinkedBlockingQueue 无界队列
  • PriorityBlockingQueue 优先级队列

2、threadFactory

threadFactory顾名思义就是线程工厂。如果我们想对线程池中的线程进行一些自定义配置,那么可以重写ThreadFactorynewThread()方法:

        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("myThread-"+thread.getId());
                thread.setDaemon(true);
                return thread;
            }
        };

3、rejectedExecutionHandler

当线程池线程都被取出,并且任务队列里的任务也排满的时候,新进入的任务怎么办?只能被拒绝。
JDK提供了四种拒绝策略,分别如下:

  • AbortPolicy 该策略直接抛出异常,阻止系统正常工作
  • CallerRunsPolicy 只要线程池未关闭,该策略直接在调用者(main)线程中运行当前被丢弃的任务。这种策略容易造成调用者线程的性能急剧下降
  • DiscardOldestPolicy 该策略丢弃任务队列中最老(最早)的一个请求,并尝试再次提交当前任务请求
  • DiscardPolicy 该策略默默丢弃无法处理的任务

4、扩展线程池

ThreadPoolExecutor的扩展机制类似与拦截器,但并未提供接口方法,而是需要重写这些方法,主要是三个方法:

        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 10, TimeUnit.MINUTES    , new SynchronousQueue<>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行");
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完成");
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };

四、不要使用Executors(What?)

如果仔细查看一下Executors的几个工厂方法,例如newSingleThreadExecutor():

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

会发现他返回的线程池,请求任务队列是一个无界队列,那么这样容易出现什么问题呢?想必你已经想到了,容易出现任务大量堆积,导致OOM。
其他的工厂方法也存在这个问题,总结起来就是:

  • fixedThreadPoolsingleThreadPool请求任务队列为无界队列,容易OOM
  • cachedThreadPoolscheduledThreadPool创建线程的数量为Integer.MAX_VALUE,容易OOM

Executors的这个问题,在大型项目中尤其需要注意。所以根据前辈们的血泪史,我们得到的宝贵经验就是:不要使用Executors获得线程池,最好选择自己来构建。

五、submit提交任务无异常的问题

如果你用线程池提交了一个Runnable任务,例如:

        executor.submit(()->{System.out.println(1/0);});

结果是不会输出任何东西,即使Runnable任务重抛出了异常。为啥会出现这种情况呢?感兴趣的同学可以自己研究下源码,主要涉及的类有FutureTaskRunnableAdapter,主要看FutureTask的run()方法即可了解原因。
不抛异常可是非常危险,为了解决这个问题,建议像线程池提交Runnable类型的任务使用execute()方法比较好!

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这