前面讲的同步并发工具有些比较简单,所以篇幅也比较短,今天要讲的线程池非常重要,所以会是一个大章哦。已经预见留言区如下:
“太长不看...”
“看到ThreadPoolExecutor构造已睡着”
“精力饱满而来,昏昏欲睡而去...”
学习的确需要投入精力,尤其涉及到一些细节,对别人能做到不明觉厉,为何不自己一览山上风景呢?
一、何为线程池
如果你接触过对线池,比如数据库连接池,他们要解决的问题是:对象的启动耗费资源比较多,最好能做到只启动一次,然后重复使用。我们平常用到static final去修饰常量,也是这个意思。
对于线程来说,启动线程相对来说耗费时间会久一点,而且线程开太多,又会耗费内存,导致GC压力,所以我们需要有控制和管理的手段。
线程池顾名思义就是一个池子,使用了它之后,创建线程变成了从池子获得线程;关闭线程变成了把线程归还给池子。
二、先来讲讲Executors与ExecutorService
JDK中自然提供了一套线程池的实现,就是ThreadPoolExecutor。
为了更简单的使用ThreadPoolExecutor,JDK提供了线程池工厂类Executors,我们来看看他都与那些工厂方法:
newFixedThreadPool(int nThreads)构建一个拥有固定线程数量的线程池newSingleThreadExecutor()构建一个只拥有一个线程的线程池newCachedThreadPool()构建一个弹性的线程池,需要多少就有多少newSingleThreadScheduledExecutor()构建一个只拥有一个线程的计划任务线程池newScheduledThreadPool(int corePoolSize)构建一个弹性的计划任务线程池
我们先来看一个例子:
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(()->{return 12;});
System.out.println(future.get());
executorService.shutdown();
我们看到了几个新的类,Executors的工厂类方法会返回一个ExecutorService,实际上他是ThreadPoolExecutor的父接口。ExecutorService提供了submit()方法,把一个Runnable或Callback提交到线程池去执行,这里的入参是Callback,Callback是一个回调接口,没有入参但有返回值。submit()方法提交了一个任务之后,会返回一个Future用来表示异步计算的结果,这里也就是获得Callback所返回的结果。
那么如果submit()入参是Runnable呢,这样的话Future一般只会获取到null。ExecutorService是继承自Executor接口,有时你不需要用Future来获取异步计算的结果,只是想从线程池取出线程来执行一些任务,那么可以:
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(()->{});
executorService.shutdown();
需要注意的是线程池创建之后是不会自动关闭的,需要手动调用shutdown()方法,ExecutorService还有一个shutdownNow()方法,表示立即结束,而不是在所有线程工作完成后优雅的结束。
1、ScheduledExecutorService
在上面我们看到Executors的工厂方法中有两个会返回ScheduledExecutorService,分别是newSingleThreadScheduledExecutor()和newScheduledThreadPool(int corePoolSize),这个跟计划任务有关了,类似Linux的at命令。
主要有两个方法来对任务进行周期性的调度:
scheduleAtFixedRate()以上个任务的执行时间为起点,之后的period时间,调度下一次任务scheduleWithFixedDelay()以上个任务的结束时间为起点,经过delay时间进行任务调度
三、重头戏之ThreadPoolExecutor
Executors的工厂方法比较合适初学者使用,简单直接。如果你想做一个高玩,那么就不得不去探索一下核心线程池的内部实现了。
来看一下Executor的newFixedThreadPool()方法:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
会发现他其实是返回了一个ThreadPoolExecutor对象,可以确定ThreadPoolExecutor就是线程池的实现类了,那ThreadPoolExecutor构造函数的这几个参数都是什么意思呢? 看一下ThreadPoolExecutor最丰富的构造,其他构造都是调用这个的:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
我们一个个来分析一下,会发现他的完善和强大:
int corePoolSize指定了线程池核心(最少)线程数量int maximumPoolSize指定了线程池最大线程数量long keepAliveTime指定了线程存活时间,也就是当有线程被归还的时候,他的存活时间,毕竟线程一直运行着也是耗费资源TimeUnit unit线程存活时间单位BlockingQueue<Runnable> workQueue任务队列,保存被提交但未被执行的任务。如果线程池已满且所有线程都在执行任务,那么后来提交的任务就会暂时保存在这个队列中ThreadFactory threadFactory创建线程时用到的线程工厂类RejectedExecutionHandler handler拒绝策略。当任务队列workQueue也满了的时候,再有任务提交到线程池,要通过什么把他拒绝掉
一下看到这么多,还是挺吓人的。其实前面几个参数还比较通俗易懂,比较难理解的在于后三个参数,我们来一一拆解一下。
1、workQueue
如果对BlockingQueue不熟悉,请参考https://my.oschina.net/lizaizhong/blog/1840206。
这个workQueue有以下几种选择:
SynchronousQueue这是一个比较特殊的队列,本身没有容量,来一个就得消费一个。可以看到Executors.newCachedThreadPool()就是使用的这个队列,因为newCachedThreadPool()中的maximumPoolSize为无限大ArrayBlockingQueue有界队列LinkedBlockingQueue无界队列PriorityBlockingQueue优先级队列
2、threadFactory
threadFactory顾名思义就是线程工厂。如果我们想对线程池中的线程进行一些自定义配置,那么可以重写ThreadFactory的newThread()方法:
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("myThread-"+thread.getId());
thread.setDaemon(true);
return thread;
}
};
3、rejectedExecutionHandler
当线程池线程都被取出,并且任务队列里的任务也排满的时候,新进入的任务怎么办?只能被拒绝。
JDK提供了四种拒绝策略,分别如下:
AbortPolicy该策略直接抛出异常,阻止系统正常工作CallerRunsPolicy只要线程池未关闭,该策略直接在调用者(main)线程中运行当前被丢弃的任务。这种策略容易造成调用者线程的性能急剧下降DiscardOldestPolicy该策略丢弃任务队列中最老(最早)的一个请求,并尝试再次提交当前任务请求DiscardPolicy该策略默默丢弃无法处理的任务
4、扩展线程池
ThreadPoolExecutor的扩展机制类似与拦截器,但并未提供接口方法,而是需要重写这些方法,主要是三个方法:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 10, TimeUnit.MINUTES , new SynchronousQueue<>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完成");
}
@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
四、不要使用Executors(What?)
如果仔细查看一下Executors的几个工厂方法,例如newSingleThreadExecutor():
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
会发现他返回的线程池,请求任务队列是一个无界队列,那么这样容易出现什么问题呢?想必你已经想到了,容易出现任务大量堆积,导致OOM。
其他的工厂方法也存在这个问题,总结起来就是:
fixedThreadPool和singleThreadPool请求任务队列为无界队列,容易OOMcachedThreadPool和scheduledThreadPool创建线程的数量为Integer.MAX_VALUE,容易OOM
Executors的这个问题,在大型项目中尤其需要注意。所以根据前辈们的血泪史,我们得到的宝贵经验就是:不要使用Executors获得线程池,最好选择自己来构建。
五、submit提交任务无异常的问题
如果你用线程池提交了一个Runnable任务,例如:
executor.submit(()->{System.out.println(1/0);});
结果是不会输出任何东西,即使Runnable任务重抛出了异常。为啥会出现这种情况呢?感兴趣的同学可以自己研究下源码,主要涉及的类有FutureTask和RunnableAdapter,主要看FutureTask的run()方法即可了解原因。
不抛异常可是非常危险,为了解决这个问题,建议像线程池提交Runnable类型的任务使用execute()方法比较好!


