java线程池,利用Exceutors创建不同的线程池满足不同场景需求:
- newSingleThreadExecutor() 创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
- newFixedThreadPool(int nThreads) 创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
- newCachedThreadPool() 创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程, 那么就会回收部分空闲(默认60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小
- 优点就是灵活创建线程,因地制宜,任务少时很省资源。缺点就是可创建的线程上限太大,源代码里是Integer.MAX_VALUE大小,这个数量有点可怕
- newScheduledThreadPool() 创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
- newWorkStealingPool() jdk8引入的,内部会构建ForkJoinPool,利用working-stealing算法,并行的处理任务,但是不保证处理顺序
Fork/Join框架
- 把大任务分割成若干个小任务并行执行,最终汇总每个小任务结果后得到大任务的框架
- work-stealing算法:某个线程从其他线程队列里窃取任务来执行
因为分割成若干个小任务由多个线程去执行,就会出现有的线程已经完成任务而有的还未完成任务,已经完成的线程就闲置了,为了提升效率,让已经完成任务的线程去其他线程窃取队列里的任务来执行。为了减少窃取线程对其他线程的竞争,通常会使用双端队列,执行任务的线程从头部拿任务执行,窃取线程是从队列尾部拿任务执行
问:为什么要使用线程池?
- 降低资源消耗(通过重复利用已创建的线程来工作,降低创建线程和销毁线程的消耗)
- 提高线程的可管理性(线程是稀缺资源,如果无限制的创建会不仅会消耗系统资源还会降低系统的稳定性,使用线程池可以统一的分配、调优、监控)
Executor的框架图
JUC的三个Executor接口
- Executor:运行新任务的简单接口,将人物提交和任务执行细节解耦
- 通过源码看到newCachedThreadPool是返回的ExecutorService newSingleTreadExecutor是返回的FinalizableDelegatedExecutorService 最终都是继承的Executor
- 接口Executor只有一个方法就是execute,对于不同的实现它可能是创建一个新线程立即启动,也可能是使用已有的线程来运行传入的任务,也可能是根据线程池容量或阻塞队列的容量来决定是否将传入的任务放入阻塞队列中或者拒绝接受任务
- ExecutorService:具备管理执行器和任务生命周期方法,提交任务机制更完善
- ExecutorService是Executor的扩展接口 提供了更方便的管理方法 最常用的是 shutdown submit
- submit参数有Callable、Runnable两种 并返回Future
- ScheduledExecutorService:支持Future和定期执行任务
ThreadPoolExecutor的构造函数
- corePoolSize:核心线程数量
- maximunPoolSize:线程不够用时能够创建的最大线程数
- workQueue:任务等待队列(当前线程数量大于等于corePoolSize的时候,将任务封装成work放入workQueue中。不同的队列排队机制不同)
- keepAliveTime:线程池维护线程的空闲时间,线程空闲超过这个时间就会被销毁
- threadFactory:创建新线程,默认使用Executors.defaultThreadFactory(),新创建的线程是一样的优先级、非守护线程
ps:newCachedThreadPool传入的队列是容量为0的SynchronousQueue,(Java 6的并发编程包中的SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样)
handler:线程池的饱和策略
- AbortPolicy:直接抛出异常,这是默认策略
- CallerRunsPolicy: 用调用者所在多线程来执行任务
- DiscardOldestPolicy:丢弃队列中最靠前的任务,并执行当前任务
- DiscardPolicy:直接丢弃任务
- 实现RejectedExecutionHandler接口自定义handler处理
execute方法执行流程如下:
线程池的状态:
- RUNNING:能够接受新任务,并且也能处理阻塞队列中的任务
- SHUTDOWN:不能接受新任务,但可以处理存量任务
- STOP:不再接受新任务,也不处理存量任务
- TIDYING:所有任务都已终止,正在进行最后的打扫工作,有效线程数为0
- TERMINATED:terminated()方法执行完成后进入该状态(该方法什么也不做只是标识)
状态转换图:
工作线程的生命周期:
问:如何选择线程池大小?(没有绝对的算法或规定,是靠经验累计总结出来的)
- CPU密集型:线程数=按照核数或者核数+1(因为如果线程太多会导致过多的上下文切换,导致不必要的开销)
- I/O密集型:线程数量=CPU核数*(1+平均等待时间/平均工作时间)
ps:
阿里编码规范指出:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors各个方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
例子:使用Guava的ThreadFactoryBuilder
输出:
不加重试的输出是:
从例子中看出 maxPoolSize + QueueSize < taskNum 就会抛出拒绝异常 如果不catch这个异常程序无法结束(这里重试机制只是个demo,正确的做法是实现RejectedExecutionHandler接口自定义handler处理)