前面讲的同步并发工具有些比较简单,所以篇幅也比较短,今天要讲的线程池非常重要,所以会是一个大章哦。已经预见留言区如下:
“太长不看...”
“看到ThreadPoolExecutor构造已睡着”
“精力饱满而来,昏昏欲睡而去...”
学习的确需要投入精力,尤其涉及到一些细节,对别人能做到不明觉厉,为何不自己一览山上风景呢?
一、何为线程池
如果你接触过对线池,比如数据库连接池,他们要解决的问题是:对象的启动耗费资源比较多,最好能做到只启动一次,然后重复使用。我们平常用到static final
去修饰常量,也是这个意思。
对于线程来说,启动线程相对来说耗费时间会久一点,而且线程开太多,又会耗费内存,导致GC压力,所以我们需要有控制和管理的手段。
线程池顾名思义就是一个池子,使用了它之后,创建线程变成了从池子获得线程;关闭线程变成了把线程归还给池子。
二、先来讲讲Executors
与ExecutorService
JDK中自然提供了一套线程池的实现,就是ThreadPoolExecutor
。
为了更简单的使用ThreadPoolExecutor
,JDK提供了线程池工厂类Executors
,我们来看看他都与那些工厂方法:
newFixedThreadPool(int nThreads)
构建一个拥有固定线程数量的线程池newSingleThreadExecutor()
构建一个只拥有一个线程的线程池newCachedThreadPool()
构建一个弹性的线程池,需要多少就有多少newSingleThreadScheduledExecutor()
构建一个只拥有一个线程的计划任务线程池newScheduledThreadPool(int corePoolSize)
构建一个弹性的计划任务线程池
我们先来看一个例子:
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(()->{return 12;});
System.out.println(future.get());
executorService.shutdown();
我们看到了几个新的类,Executors
的工厂类方法会返回一个ExecutorService
,实际上他是ThreadPoolExecutor
的父接口。ExecutorService
提供了submit()
方法,把一个Runnable
或Callback
提交到线程池去执行,这里的入参是Callback
,Callback
是一个回调接口,没有入参但有返回值。submit()
方法提交了一个任务之后,会返回一个Future
用来表示异步计算的结果,这里也就是获得Callback
所返回的结果。
那么如果submit()
入参是Runnable
呢,这样的话Future
一般只会获取到null
。ExecutorService
是继承自Executor
接口,有时你不需要用Future
来获取异步计算的结果,只是想从线程池取出线程来执行一些任务,那么可以:
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(()->{});
executorService.shutdown();
需要注意的是线程池创建之后是不会自动关闭的,需要手动调用shutdown()
方法,ExecutorService
还有一个shutdownNow()
方法,表示立即结束,而不是在所有线程工作完成后优雅的结束。
1、ScheduledExecutorService
在上面我们看到Executors
的工厂方法中有两个会返回ScheduledExecutorService
,分别是newSingleThreadScheduledExecutor()
和newScheduledThreadPool(int corePoolSize)
,这个跟计划任务有关了,类似Linux的at命令。
主要有两个方法来对任务进行周期性的调度:
scheduleAtFixedRate()
以上个任务的执行时间为起点,之后的period时间,调度下一次任务scheduleWithFixedDelay()
以上个任务的结束时间为起点,经过delay时间进行任务调度
三、重头戏之ThreadPoolExecutor
Executors
的工厂方法比较合适初学者使用,简单直接。如果你想做一个高玩,那么就不得不去探索一下核心线程池的内部实现了。
来看一下Executor
的newFixedThreadPool()
方法:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
会发现他其实是返回了一个ThreadPoolExecutor
对象,可以确定ThreadPoolExecutor
就是线程池的实现类了,那ThreadPoolExecutor
构造函数的这几个参数都是什么意思呢? 看一下ThreadPoolExecutor
最丰富的构造,其他构造都是调用这个的:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
我们一个个来分析一下,会发现他的完善和强大:
int corePoolSize
指定了线程池核心(最少)线程数量int maximumPoolSize
指定了线程池最大线程数量long keepAliveTime
指定了线程存活时间,也就是当有线程被归还的时候,他的存活时间,毕竟线程一直运行着也是耗费资源TimeUnit unit
线程存活时间单位BlockingQueue<Runnable> workQueue
任务队列,保存被提交但未被执行的任务。如果线程池已满且所有线程都在执行任务,那么后来提交的任务就会暂时保存在这个队列中ThreadFactory threadFactory
创建线程时用到的线程工厂类RejectedExecutionHandler handler
拒绝策略。当任务队列workQueue也满了的时候,再有任务提交到线程池,要通过什么把他拒绝掉
一下看到这么多,还是挺吓人的。其实前面几个参数还比较通俗易懂,比较难理解的在于后三个参数,我们来一一拆解一下。
1、workQueue
如果对BlockingQueue
不熟悉,请参考https://my.oschina.net/lizaizhong/blog/1840206。
这个workQueue
有以下几种选择:
SynchronousQueue
这是一个比较特殊的队列,本身没有容量,来一个就得消费一个。可以看到Executors.newCachedThreadPool()
就是使用的这个队列,因为newCachedThreadPool()
中的maximumPoolSize
为无限大ArrayBlockingQueue
有界队列LinkedBlockingQueue
无界队列PriorityBlockingQueue
优先级队列
2、threadFactory
threadFactory
顾名思义就是线程工厂。如果我们想对线程池中的线程进行一些自定义配置,那么可以重写ThreadFactory
的newThread()
方法:
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("myThread-"+thread.getId());
thread.setDaemon(true);
return thread;
}
};
3、rejectedExecutionHandler
当线程池线程都被取出,并且任务队列里的任务也排满的时候,新进入的任务怎么办?只能被拒绝。
JDK提供了四种拒绝策略,分别如下:
AbortPolicy
该策略直接抛出异常,阻止系统正常工作CallerRunsPolicy
只要线程池未关闭,该策略直接在调用者(main)线程中运行当前被丢弃的任务。这种策略容易造成调用者线程的性能急剧下降DiscardOldestPolicy
该策略丢弃任务队列中最老(最早)的一个请求,并尝试再次提交当前任务请求DiscardPolicy
该策略默默丢弃无法处理的任务
4、扩展线程池
ThreadPoolExecutor
的扩展机制类似与拦截器,但并未提供接口方法,而是需要重写这些方法,主要是三个方法:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 10, TimeUnit.MINUTES , new SynchronousQueue<>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完成");
}
@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
四、不要使用Executors(What?)
如果仔细查看一下Executors
的几个工厂方法,例如newSingleThreadExecutor()
:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
会发现他返回的线程池,请求任务队列是一个无界队列,那么这样容易出现什么问题呢?想必你已经想到了,容易出现任务大量堆积,导致OOM。
其他的工厂方法也存在这个问题,总结起来就是:
fixedThreadPool
和singleThreadPool
请求任务队列为无界队列,容易OOMcachedThreadPool
和scheduledThreadPool
创建线程的数量为Integer.MAX_VALUE
,容易OOM
Executors
的这个问题,在大型项目中尤其需要注意。所以根据前辈们的血泪史,我们得到的宝贵经验就是:不要使用Executors
获得线程池,最好选择自己来构建。
五、submit提交任务无异常的问题
如果你用线程池提交了一个Runnable
任务,例如:
executor.submit(()->{System.out.println(1/0);});
结果是不会输出任何东西,即使Runnable
任务重抛出了异常。为啥会出现这种情况呢?感兴趣的同学可以自己研究下源码,主要涉及的类有FutureTask
和RunnableAdapter
,主要看FutureTask
的run()方法即可了解原因。
不抛异常可是非常危险,为了解决这个问题,建议像线程池提交Runnable
类型的任务使用execute()
方法比较好!