Executors相关的类(线程池)

Stella981
• 阅读 713

一、概述

Java是天生就支持并发的语言,支持并发意味着多线程,线程的频繁创建在高并发及大数据量是非常消耗资源的,因为java提供了线程池。在jdk1.5以前的版本中,线程池的使用是及其简陋的,但是在JDK1.5后,有了很大的改善。JDK1.5之后加入了java.util.concurrent包,java.util.concurrent包的加入给予开发人员开发并发程序以及解决并发问题很大的帮助。这篇文章主要介绍下并发包下的Executor接口,Executor接口虽然作为一个非常旧的接口(JDK1.5 2004年发布),但是很多程序员对于其中的一些原理还是不熟悉,因此写这篇文章来介绍下Executor接口,同时巩固下自己的知识。如果文章中有出现错误,欢迎大家指出。

二、Executors工厂类

对于数据库连接,我们经常听到数据库连接池这个概念。因为建立数据库连接时非常耗时的一个操作,其中涉及到网络IO的一些操作。因此就想出把连接通过一个连接池来管理。需要连接的话,就从连接池里取一个。当使用完了,就“关闭”连接,这不是正在意义上的关闭,只是把连接放回到我们的池里,供其他人在使用。所以对于线程,也有了线程池这个概念,其中的原理和数据库连接池是差不多的,因此java jdk中也提供了线程池的功能。

线程池的作用:线程池就是限制系统中使用线程的数量以及更好的使用线程

    根据系统的运行情况,可以自动或手动设置线程数量,达到运行的最佳效果:配置少了,将影响系统的执行效率,配置多了,又会浪费系统的资源。用线程池配置数量,其他线程排队等候。当一个任务执行完毕后,就从队列中取一个新任务运行,如果没有新任务,那么这个线程将等待。如果来了一个新任务,但是没有空闲线程的话,那么把任务加入到等待队列中。

为什么要用线程池:

  1. 减少线程创建和销毁的次数,使线程可以多次复用
  2. 可以根据系统情况,调整线程的数量。防止创建过多的线程,消耗过多的内存(每个线程1M左右)

Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。

比较重要的几个类:

ExecutorService

真正的线程池接口。

ScheduledExecutorService

能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。

ThreadPoolExecutor

ExecutorService的默认实现。

ScheduledThreadPoolExecutor

继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。

要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。

1. newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor()

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

2. newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads)

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,在提交新任务,任务将会进入等待队列中等待。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

3. newCachedThreadPool

public static ExecutorService newCachedThreadPool()

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,

那么就会回收部分空闲(60秒处于等待任务到来)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池的最大值是Integer的最大值(2^31-1)。

4. newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

实例:

注:使用了java8的lambda表达式以及stream

1**.**newSingleThreadExecutor

public class SingleThreadExecutorTest {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        IntStream.range(0, 5).forEach(i -> executor.execute(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("finished: " + threadName);
        }));

        try {
            //close pool
            executor.shutdown();
            executor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (!executor.isTerminated()) {
                executor.shutdownNow();
            }
        }
    }
}

输出结果:

finished: pool-1-thread-1
finished: pool-1-thread-1
finished: pool-1-thread-1
finished: pool-1-thread-1
finished: pool-1-thread-1

线程名都一样,说明是同一个线程

2.newFixedThreadPool

public class FixedThreadExecutorTest {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(3);

        IntStream.range(0, 6).forEach(i -> executor.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                String threadName = Thread.currentThread().getName();
                System.out.println("finished: " + threadName);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
//close pool 同上...
    
    }
}

输出结果:

finished: pool-1-thread-2
finished: pool-1-thread-3
finished: pool-1-thread-1
finished: pool-1-thread-1
finished: pool-1-thread-2
finished: pool-1-thread-3

只创建了三个线程

3.newCachedThreadPool

public class CachedThreadExecutorTest {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newCachedThreadPool();
        IntStream.range(0, 6).forEach(i -> executor.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                String threadName = Thread.currentThread().getName();
                System.out.println("finished: " + threadName);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
         //close pool 同上... 
    }
}

输出结果:

finished: pool-1-thread-1
finished: pool-1-thread-2
finished: pool-1-thread-3
finished: pool-1-thread-6
finished: pool-1-thread-5
finished: pool-1-thread-4

创建了6个线程

4.newScheduledThreadPool

public class ScheduledThreadExecutorTest {

    public static void main(String[] args) {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);

        executor.scheduleAtFixedRate(() -> System.out.println(System.currentTimeMillis()), 1000, 2000, TimeUnit.MILLISECONDS);

            //close pool 同上
    }
}

输出结果:

1457967177735
1457967179736
1457967181735

三:ThreadPoolExecutor详解

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。

  在ThreadPoolExecutor类中提供了四个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

ThreadPoolExecutor的完整构造方法的签名是:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有 任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者 prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线 程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize 时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize。即当线程池中的线程数大于corePoolSize 时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了 allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize 时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

    TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒

  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

    ArrayBlockingQueue; //有界队列
    LinkedBlockingQueue; //无界队列
    SynchronousQueue; //特殊的一个队列,只有存在等待取出的线程时才能加入队列,可以说容量为0,是无界队列
    

     ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

  • threadFactory:线程工厂,主要用来创建线程;

  • handler:表示当拒绝处理任务时的策略,有以下四种取值:

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 
    

    ThreadPoolExecutorExecutors****类的底层实现。

    在JDK帮助文档中,有如此一段话:

    “强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)Executors.newSingleThreadExecutor()(单个后台线程)

    它们均为大多数使用场景预定义了设置。”

    下面介绍一下几个类的源码:

    **ExecutorService   newFixedThreadPool (int nThreads):**固定大小线程池。

    可以看到,corePoolSize和maximumPoolSize的大小是一样的(实际上,后面会介绍,如果使用无界queue的话 maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值表明什么?-就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,该queue有一个特点,他是无界的。

    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue());
    }

**ExecutorService   newSingleThreadExecutor()**:单线程

public static ExecutorService newSingleThreadExecutor() {   
          return new FinalizableDelegatedExecutorService   
              (new ThreadPoolExecutor(1, 1,   
                                      0L, TimeUnit.MILLISECONDS,   
                                      new LinkedBlockingQueue<Runnable>()));   
      }

**ExecutorService newCachedThreadPool()**:无界线程池,可以进行自动线程回收

这个实现就有意思了。首先是无界的线程池,所以我们可以发现maximumPoolSize为big big。其次BlockingQueue的选择上使用SynchronousQueue。可能对于该BlockingQueue有些陌生,简单说:该 QUEUE中,每个插入操作必须等待另一个线程的对应移除操作。

public static ExecutorService newCachedThreadPool() {   
           return new ThreadPoolExecutor(0, Integer.MAX_VALUE,   
                                       60L, TimeUnit.SECONDS,   
                                        new SynchronousQueue<Runnable>());   
    }

从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

public abstract class AbstractExecutorService implements ExecutorService

而ExecutorService又是继承了Executor接口

public interface ExecutorService extends Executor

我们看一下Executor接口的实现:

public interface Executor {
    void execute(Runnable command);
}

到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。

  Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

  然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

  然后ThreadPoolExecutor继承了类AbstractExecutorService。

  在ThreadPoolExecutor类中有几个非常重要的方法:

public void execute(Runnable command)

public <T> Future<T> submit(Callable<T> task)

public void shutdown()

public List<Runnable> shutdownNow()  //返回未执行的任务

execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

  submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中 并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的 实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

  shutdown()和shutdownNow()是用来关闭线程池的。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Wesley13 Wesley13
3年前
java 线程池入门
一简介线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的。在jdk1.5之后这一情况有了很大的改观。Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用。为我们在开发中处理线程的问题提供了非常大的帮助。二:线程池
Wesley13 Wesley13
3年前
java线程池ThreadPoolExecutor八种拒绝策略浅析
前言谈到java的线程池最熟悉的莫过于ExecutorService接口了,jdk1.5新增的java.util.concurrent包下的这个api,大大的简化了多线程代码的开发。而不论你用FixedThreadPool还是CachedThreadPool其背后实现都是ThreadPoolExecutor。ThreadPoolExecutor是一
zdd小小菜鸟 zdd小小菜鸟
2年前
多线程面试
多线程篇1.为什么要使用线程池tex避免频繁地创建和销毁线程,达到线程对象的重用。另外,使用线程池还可以根据项目灵活地控制并发的数目。2.java中如何获取到线程dump文件tex死循环、死锁、阻
Stella981 Stella981
3年前
Executor框架
任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。线程池简化了线程的管理工作,并且java.util.concurrent提供了一种灵活的线程池实现作为Executor框架的一部分。在Java类库中,任务执行的主要抽象不是Thread,而是Executor,如下所示:publicinterfaceExecutor{void
Wesley13 Wesley13
3年前
Java多线程之线程池7大参数、底层工作原理、拒绝策略详解
Java多线程之线程池7大参数详解目录企业面试题线程池7大参数源码线程池7大参数详解底层工作原理详解线程池的4种拒绝策略理论简介面试的坑:线程池实际中使用哪一个?1\.企业面试题线程池的工作原理,几个重要参数,然后给了具体几个参数分析线程池会怎么做,最后问阻塞队列用是什么?线程池的构造类的方
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
3年前
Noark入门之线程模型
0x00单线程多进程单线程与单进程多线程的目的都是想尽可能的利用CPU,减少CPU的空闲时间,特别是多核环境,今天咱不做深度解读,跳过...0x01线程池锁最早的一部分游戏服务器是采用线程池的方式来处理玩家的业务请求,以达最大限度的利用多核优势来提高处理业务能力。但线程池同时也带来了并发问题,为了解决同一玩家多个业务请求不被
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这