一、为什么需要使用线程池
1、减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
2、可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。
Java中创建和销毁一个线程是比较昂贵的操作,需要系统调用。频繁创建和销毁线程会影响系统性能。于是线程池应运而生。
作用:
1、限制系统中执行线程的数量。 根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果。 少了浪费了系统资源,多了造成系统拥挤效率不高。
用线程池控制线程数量,其他线程排 队等候。
一个任务执行完毕,再从队列的中取最前面的任务开始执行。 若队列中没有等待进程,线程池的这一资源处于等待。 当一个新任务需要运行时,
如果线程池 中有等待的工作线程,就可以开始运行了;否则进入等待队列。
Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。
二、Executor框架
接口:Executor,CompletionService,ExecutorService,ScheduledExecutorService
抽象类:AbstractExecutorService
实现类:ExecutorCompletionService,ThreadPoolExecutor,ScheduledThreadPoolExecutor
它的子类和实现主要包括ExecutorService,ScheduledExecutorService,ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool等
Executor:Executor是一个接口,其只定义了一个execute()方法:void execute(Runnable command);
,只能提交Runnable形式的任务,不支持提交Callable带有返回值的任务。
ExecutorService:ExecutorService在Executor的基础上加入了线程池的生命周期管理,我们可以通过ExecutorService#shutdown或者ExecutorService#shutdownNow方法来关闭我们的线程池。
ThreadPoolExecutor 继承自 AbstractExecutorService 实现了 ExecutorService 接口,
ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor 实现了 ExecutorService 和 ScheduledExecutorService 接口
构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize:线程池的核心线程数目,当一个请求进来时如果当前线程池中线程数量小于这个值,则直接通过ThreadFactory新建一个线程来处理这个请求,如果已有线程数量大于等于这个值则将请求放入阻塞队列中。
maximumPoolSize:最大线程个数,当大于了这个值就会将准备新加的异步任务由一个丢弃处理机制来处理,大于 corePoolSize 且小于 maximumPoolSize 则新建 Thread 执行,但是当通过newFixedThreadPool 创建的时候,corePoolSize 和 maximumPoolSize 是一样的,而corePoolSize 是先执行的,所以他会先被放入等待队列而不会执行到下面的丢弃处理中;
workQueue:阻塞队列,超过corePoolSize部分的请求放入这个阻塞队列中等待执行。阻塞队列分为有界阻塞队列和无界阻塞队列。在创建阻塞队列时如果我们指定了这个队列的“capacity”则这个队列就是有界的,否则是无界的。这里有一点需要注意:使用线程池之前请明确是否真的需要无界阻塞队列,如果阻塞队列是无界的,会导致大量的请求堆积,进而造成内存溢出系统崩溃。
任务缓存队列
任务缓存队列,即workQueue,它用来存放等待执行的任务。
workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:
1)有界任务队列ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
2)无界任务队列LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
3)直接提交队列synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
keepAliveTime :表示线程没有任务时最多保持多久然后停止。默认情况下,只有线程池中线程数大于corePoolSize 时,keepAliveTime 才会起作用。换句话说,当线程池中的线程数大于corePoolSize,并且一个线程空闲时间达到了keepAliveTime,那么就是shutdown。
threadFactory:是一个线程池工厂,主要用来为线程池创建线程,我们可以定制一个ThreadFactory来达到统一命名我们线程池中的线程的目的。如果没指定的话,默认会使用Executors.defaultThreadFactory(),可以使用默认的 default实现,也可以自己去包装和传递,主要实现 newThread 方法即可;
handler:即当任务提交失败的时候,会调用这个处理器,ThreadPoolExecutor内置了多个实现,比如抛异常、直接抛弃等。这里也需要根据业务场景进行设置,比如说当队列积压的时候,针对性的对线程池扩容或者发送告警等策略。当参数 maximumPoolSize 达到后丢弃处理的方法实现,java 提供了 5种丢弃处理的方法,当然也可以自己弄,主要是要实现接口 RejectedExecutionHandler 中rejectedExecution(Runnabler, ThreadPoolExecutor e) 方法,java 默认使用的是AbortPolicy,他的作用是当出现这种情况的时候抛出一个异常;通常得到线程池后会调用其中的 submit 或 execute 方法去提交执行异步任务,其实 submit 方法最终会调用execute 方法来进行操作,只是他提供了一个 Future托管返回值的处理而已,当你调用需要有返回值的信息时用它来处理是比较好的,这个 Future 会包装 Callable 信息。
拒绝策略:
1、AbortPolicy:丢弃任务并抛出RejectedExecutionException
2、CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
3、DiscardOldestPolicy:丢弃队列中最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
4、DiscardPolicy:丢弃任务,不做任何处理。
Java通过Executors提供四种线程池,分别为: newCachedThreadPool 、newFixedThreadPool 、newScheduledThreadPool 、newSingleThreadExecutor
newCachedThreadPool:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
底层调用的是ThreadPoolExecutor方法,传入一个同步的阻塞队列实现
ThreadPoolExecupublic ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
实例如下:
public class CachePool {
private static Runnable getThread(final int i){
return new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
}catch (Exception e){
}
System.out.println(i);
}
};
}
public static void main(String args[]){
ExecutorService cachePool = Executors.newCachedThreadPool();
for (int i=1;i<=10;i++){
cachePool.execute(getThread(i));
}
}
}
newFixedThreadPool : 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue
实例如下:
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class ThreadPool {
public static void main(String[] args) {
fixedThreadPool();
}
/**
* 固定大小的线程池
*
* 同时可以处理【参数】个任务,多余的任务会排队,当处理完一个马上就会去接着处理排队中的任务。
* Callable的任务在后面的blog有更详细的文章说明
*/
private static void fixedThreadPool(){
ExecutorService es = Executors.newFixedThreadPool(2);
//加入5个任务
for(int i=1 ; i<5; i++){
final int task = i;
es.execute(new Runnable() {
@Override
public void run() {
for(int j=1; j<=2; j++){
System.out.println("现在运行的是第【 " + task + "】任务");
System.out.println(Thread.currentThread().getName() + "is work , now loop to " + j);
if(j==2){
System.out.println("任务 【" + task + "】运行完成");
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
System.out.println("5个Runnable任务submit完成!!");
//加入5个Callable任务,该任务执行完后是有返回值的则会发生堵塞,也就是取到5个任务的结果后才会继续往下走
for(int i=1 ; i<=5; i++){
final int task = i;
Future<Integer> future = es.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Callable 任务【" + task + "】运行");
return new Random().nextInt(100);
}
});
//如果注释取结果的代码,则不会堵塞
/*try {
System.out.println("任务【" + i + "】返回的结果:" + future.get());
} catch (Exception e) {
e.printStackTrace();
}*/
}
System.out.println("5个Callable任务submit完成!!" + System.currentTimeMillis() );
//虽然shutdown方法是等所有任务跑完后才真正停掉线程池,但该方法不会造成堵塞,也就是这代码运行后,下一行代码会立刻运行
es.shutdown();
System.out.println("主程序shutdown后退出!!" + System.currentTimeMillis());
//暴力的直接终止线程池
//es.shutdownNow();
//awaitTermination方法是堵塞式的,只有等真的把线程池停掉才会让程序继续往下执行
try {
es.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主程序后awaitTermination退出!!" + System.currentTimeMillis());
}
}
newScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。 ScheduledThreadPoolExecutor:ThreadPoolExecutor子类,它在ThreadPoolExecutor基础上加入了任务定时执行的功能。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}