利用DUCC配置平台实现一个动态化线程池

京东云开发者
• 阅读 538

作者:京东零售 张宾

1.背景

在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验。然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数。在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高。一种解决办法就是,将线程池的配置放到配置平台侧,系统运行期间开发人员根据系统运行情况对核心参数进行动态配置。

本文以公司DUCC配置平台作为服务配置中心,以修改线程池核心线程数、最大线程数为例,实现一个简单的动态化线程池。

2.代码实现

当前项目中使用的是Spring 框架提供的线程池类ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底层又使用里了JDK中线程池类ThreadPoolExecutor,线程池类ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize可以在运行时设置核心线程数和最大线程数。

setCorePoolSize方法执行流程是:首先会覆盖之前构造函数设置的corePoolSize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:

利用DUCC配置平台实现一个动态化线程池

setMaximumPoolSize方法: 首先会覆盖之前构造函数设置的maximumPoolSize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁。

Spring 框架提供的线程池类ThreadPoolTaskExecutor,此类封装了对ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize的调用。

利用DUCC配置平台实现一个动态化线程池

基于以上源代码分析,要实现一个简单的动态线程池需要以下几步:

(1)定义一个动态线程池类,继承ThreadPoolTaskExecutor,目的跟非动态配置的线程池类ThreadPoolTaskExecutor区分开;

(2)定义和实现一个动态线程池配置定时刷的类,目的定时对比ducc配置的线程池数和本地应用中线程数是否一致,若不一致,则更新本地动态线程池线程池数;

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key;

(4)定义和实现一个应用启动后根据动态线程池Bean和从ducc配置平台拉取配置刷新应用中的线程数配置;

接下来代码一一实现:

(1)动态线程池类

/**
 * 动态线程池
 *
 */
public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
}

(2)动态线程池配置定时刷新类

@Slf4j
public class DynamicThreadPoolRefresh implements InitializingBean {
    /**
     * Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor.
     */
    private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();

    /**
     * @param threadPoolBeanName
     * @param threadPoolTaskExecutor
     */
    public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) {
        log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor()));
        DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        this.refresh();
        //创建定时任务线程池
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build());
        //延迟1秒执行,每个1分钟check一次
        executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS);
    }

    private void refresh() {
        String dynamicThreadPool = "";
        try {
            if (DTP_REGISTRY.isEmpty()) {
                log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty");
                return;
            }
            dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL);
            if (StringUtils.isBlank(dynamicThreadPool)) {
                log.debug("DynamicThreadPool refresh dynamicThreadPool not config");
                return;
            }
            log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool);
            List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() {
            });
            if (CollectionUtils.isEmpty(threadPoolPropertiesList)) {
                log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool);
                return;
            }
            for (ThreadPoolProperties properties : threadPoolPropertiesList) {
                doRefresh(properties);
            }
        } catch (Exception e) {
            log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e);
        }
    }

    /**
     * @param properties
     */
    private void doRefresh(ThreadPoolProperties properties) {
        if (StringUtils.isBlank(properties.getThreadPoolBeanName())
                || properties.getCorePoolSize() < 1
                || properties.getMaxPoolSize() < 1
                || properties.getMaxPoolSize() < properties.getCorePoolSize()) {
            log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties);
            return;
        }
        DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName());
        if (Objects.isNull(threadPoolTaskExecutor)) {
            log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName());
            return;
        }
        ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
        if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())
                && Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
            log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName());
            return;
        }
        if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) {
            threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize());
            log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize());
        }
        if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
            threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize());
            log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize());
        }

        ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
        log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp);
    }

    private class RefreshThreadPoolConfig extends TimerTask {
        private RefreshThreadPoolConfig() {
        }

        @Override
        public void run() {
            DynamicThreadPoolRefresh.this.refresh();
        }
    }

}

线程池配置类

@Data
public class ThreadPoolProperties {
    /**
     * 线程池名称
     */
    private String threadPoolBeanName;
    /**
     * 线程池核心线程数量
     */
    private int corePoolSize;
    /**
     * 线程池最大线程池数量
     */
    private int maxPoolSize;
}

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key

ducc配置平台使用见:https://cf.jd.com/pages/viewpage.action?pageId=403477057

动态线程池配置key:dynamic.thread.pool

配置value:

[
  {
    "threadPoolBeanName": "submitOrderThreadPoolTaskExecutor",
    "corePoolSize": 32,
    "maxPoolSize": 128
  }
]

(4) 应用启动刷新应用本地动态线程池配置

@Slf4j
public class DynamicThreadPoolPostProcessor implements BeanPostProcessor {

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof DynamicThreadPoolTaskExecutor) {
            DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean);
        }
        return bean;
    }
}

3.动态线程池应用

动态线程池Bean声明

    <!-- 普通线程池 -->
    <bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrapper">
        <!-- 核心线程数,默认为 -->
        <property name="corePoolSize" value="128"/>
        <!-- 最大线程数,默认为Integer.MAX_VALUE -->
        <property name="maxPoolSize" value="512"/>
        <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->
        <property name="queueCapacity" value="500"/>
        <!-- 线程池维护线程所允许的空闲时间,默认为60s -->
        <property name="keepAliveSeconds" value="60"/>
        <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->
        <property name="rejectedExecutionHandler">
            <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
            <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
            <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
            <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
    </bean>
    <!-- 动态线程池 -->
    <bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor">
        <!-- 核心线程数,默认为 -->
        <property name="corePoolSize" value="32"/>
        <!-- 最大线程数,默认为Integer.MAX_VALUE -->
        <property name="maxPoolSize" value="128"/>
        <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->
        <property name="queueCapacity" value="500"/>
        <!-- 线程池维护线程所允许的空闲时间,默认为60s -->
        <property name="keepAliveSeconds" value="60"/>
        <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->
        <property name="rejectedExecutionHandler">
            <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
            <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
            <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
            <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
    </bean>
    <!-- 动态线程池刷新配置 -->
    <bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/>
    <bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/>

业务类注入Spring Bean后,直接使用即可

 @Resource
 private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor;


 Runnable asyncTask = ()->{...};
 CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);

4.小结

本文从实际项目的业务痛点场景出发,并基于公司已有的ducc配置平台简单实现了线程池线程数量可配置。

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
java各种面试问题
二、Java多线程相关线程池的原理,为什么要创建线程池?创建线程池的方式;线程的生命周期,什么时候会出现僵死进程;说说线程安全问题,什么实现线程安全,如何实现线程安全;创建线程池有哪几个核心参数?如何合理配置线程池的大小?volatile、ThreadLocal的使用场景和原理;
Wesley13 Wesley13
3年前
JAVA多线程学习
Java通过Excutors提供四种线程池:newCachedThreadPool        创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。newFixedThreadPool        创建一个定长线程,可控制线程最大并发数
Stella981 Stella981
3年前
Jetty调优文档
1.      线程池线程池线程资源大小确定了服务器的服务能力默认大小不一定能满足生产环境线程分配方式决定了服务器的资源利用效率固定线程数处理多任务,代表:JDK的ThreadPoolExecutor       以最大的线程数为限处理多任务,代表:jetty自带的QueuedThreadPoolje
Wesley13 Wesley13
3年前
Java多线程之线程池7大参数、底层工作原理、拒绝策略详解
Java多线程之线程池7大参数详解目录企业面试题线程池7大参数源码线程池7大参数详解底层工作原理详解线程池的4种拒绝策略理论简介面试的坑:线程池实际中使用哪一个?1\.企业面试题线程池的工作原理,几个重要参数,然后给了具体几个参数分析线程池会怎么做,最后问阻塞队列用是什么?线程池的构造类的方
Wesley13 Wesley13
3年前
Java通过Executors提供四种线程池
Java通过Executors提供四种线程池,分别为:newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。newFixedThreadPool创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。newScheduledThreadPool创建
Wesley13 Wesley13
3年前
(CSDN 迁移) JAVA多线程实现
前几篇文章中分别介绍了单线程化线程池(newSingleThreadExecutor)可控最大并发数线程池(newFixedThreadPool)可回收缓存线程池(newCachedThreadPool)newScheduledThreadPool用于构造安排线程池,能够根据需要安排在给定延迟后运行命令或者定期地执行。在JAVA文档的介绍
Stella981 Stella981
3年前
Noark入门之线程模型
0x00单线程多进程单线程与单进程多线程的目的都是想尽可能的利用CPU,减少CPU的空闲时间,特别是多核环境,今天咱不做深度解读,跳过...0x01线程池锁最早的一部分游戏服务器是采用线程池的方式来处理玩家的业务请求,以达最大限度的利用多核优势来提高处理业务能力。但线程池同时也带来了并发问题,为了解决同一玩家多个业务请求不被
Wesley13 Wesley13
3年前
Java 基础知识(七)
1.创建线程池1)newCacheThreadPool 创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程 2)newFixedThreadPool  创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待 3)newScheduledThreadPool  创建一个定长线程池,支持
Easter79 Easter79
3年前
ThreadPoolExecutor和ScheduledThreadPoolExecutor
ThreadPoolExecutor构造方法参数说明corePoolSize核心线程数,默认情况下核心线程会一直存活,即使处于闲置状态也不会受存keepAliveTime限制。除非将allowCoreThreadTimeOut设置为true。maximumPoolSize线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有
京东云开发者 京东云开发者
7个月前
动态线程池思想学习及实践
相关文档美团线程池实践:线程池思想解析:引言在后台项目开发过程中,我们常常借助线程池来实现多线程任务,以此提升系统的吞吐率和响应性;而线程池的参数配置却是一个难以合理评估的值,虽然业界也针对cpu密集型,IO密集型等场景给出了一些参数配置的经验与方案,但是