JDK中线程池满后再放入队列

Wesley13
• 阅读 558

    JDK中ThreadPoolExecutor有coreSize、maxSize,只有当线程数到coreSize且队列满后才会增加线程数到maxSize.

    想要达到的效果是线程数到maxSize后再放入队列。

方案一

    覆写ThreadPoolExecutor的execute()

    List-1

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //workerCountOf()获取线程数,线程worker数只是int类型二进制位的前几位
    int c = ctl.get();
    //1
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //2
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //3
    else if (!addWorker(command, false))
        reject(command);
}

线程数的表示,是用int类型的二进制的左边几位标示的,所以才需要调用workCountOf()方法来获取

  1. 如果线程数小于coreSize,那么增加worker线程
  2. 将任务放入到队列中,如果成功表示队列没有满
  3. 队列满了,此时尝试去增加worker线程数,让worker线程数达到maxSize

    如果我们自己定义一个CustomThreadPoolExecutor,然后覆写execute(),那么你会发现ctl、_workerCountOf、_addWorker都是private的,子类上根本访问不了,所以这个方案是不行的

方案二

    自己定义一个CustomThreadPoolExecutor,之后将JDK中ThreadPoolExecutor中的内容全部拷贝过来,之后再改写execute()的实现,但是这个成本很大。

    最重要的,连拒绝策略也不能使用JDK里面的了,因为如下List-2所示第二个参数是ThreadPoolExecutor

    List-2

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}

    此方案不行

方案三

  1. 自定义queue,改写offer逻辑
  2. 覆写ThreadPoolExecutor的execute()

    List-3

public class TaskQueue<R extends Runnable> extends ArrayBlockingQueue<Runnable> {
    private EagerThreadPoolExecutor executor;

    public TaskQueue(int size, boolean fair) {
        super(size, fair);
    }

    @Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }
        int currentPoolThreadSize = executor.getPoolSize();
        // have free worker. put task into queue to let the worker deal with task.
        //1
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }
        // return false to let executor create new worker.
        //2
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }
        // currentPoolThreadSize >= max
        //3
        return super.offer(runnable);
    }

    /**
     * retry offer task
     *
     * @param o task
     * @return offer success or not
     * @throws RejectedExecutionException if executor is terminated.
     */
    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor is shutdown!");
        }
        return super.offer(o, timeout, unit);
    }

    public void setExecutor(EagerThreadPoolExecutor executor){
        this.executor = executor;
    }
}

TaskQueue的offer方法是,放入队列时被调用

  1. 如果当前提交的task数少于线程池中线程是数量,那么直接调用父类的offer,将task放入队列,不新建线程,因此此时肯定有空闲的线程
  2. 此时线程池中没有空闲的线程,而且线程数量少于设置的maxSize,此时返回false,让线程池去创建新的线程
  3. 此时线程数量大于等于maxSize,将task放入任务队列中

EagerThreadPoolExecutor继承ThreadPoolExecutor,改写execute()的实现:

    List-4

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    public EagerThreadPoolExecutor(int coreSize, int maxSize, long l, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
        super(coreSize, maxSize, l, timeUnit, blockingQueue, threadFactory, rejectedExecutionHandler);
    }

    @Override
    protected void beforeExecute(Thread thread, Runnable runnable) {
        //可以结合afterExecute统计执行耗时
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        //1
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                //2
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            //3
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        //ThreadPoolExecutor的勾子方法,在task执行完后需要将池中已提交的任务数 - 1
        //afterExecute和beforeExecute是在runWorker中调用,即使有异常,也不会抛出RejectedExecutionException异常
        submittedTaskCount.decrementAndGet();
        if (throwable!=null) {
            LOG.error("线程池中线程执行出错", throwable);
        }
    }

    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }
}
  1. execute()开始时,将提交的任务数加1,之后调用父类的execute()方法,如List-1中,当线程数达到coreSize后,就会调用queue.offer(),即List-3中的offer(),我们会判断线程数是否少于maxSize,如果少于那么返回false,之后ThreadPoolExecutor.execute()方法会去新增线程
  2. 2处如果被拒绝了,说明队列满了而且线程数达到了maxSize,此时我们再重试一次,将task放入队列中,为什么还要重试一次?List-3中offer中做了一些操作,有可能这期间队列就有空了,所以要重试下。
  3. 3处捕获到任何的异常,之后将task数减去1,3处的Throwable是捕获不到2处抛出的RejectedExecutionException的

    为什么afterExecute()方法中还要将task数减去1呢?

ThreadPoolExecutor中,beforeExecute()和afterExecute()是在runWorker的run()中被调用的,分别在Runnable.run()的前后被调用,而且线程池中抛出异常,在线程池外面是捕获不到的,所以外面需要的afterExecute()中将task数减去1

    改进:我们可以将List-4中使用的AtomicInteger改为JDK8的LongAddr以提高性能

Reference

  1. https://github.com/apache/dubbo/blob/fb5761683729f63c715e39c78a8b7b75278050c9/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
Java日期时间API系列31
  时间戳是指格林威治时间1970年01月01日00时00分00秒起至现在的总毫秒数,是所有时间的基础,其他时间可以通过时间戳转换得到。Java中本来已经有相关获取时间戳的方法,Java8后增加新的类Instant等专用于处理时间戳问题。 1获取时间戳的方法和性能对比1.1获取时间戳方法Java8以前
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这