页面查询多项数据组合的线程池设计 | 京东云技术团队

京东云开发者
• 阅读 351

背景

我们应对并发场景时一般会采用下面方式去预估线程池的线程数量,比如QPS需求是1000,平均每个任务需要执行的时间是t秒,那么我们需要的线程数是t * 1000。

但是在一些情况下,这个t是不好估算的,即便是估算出来了,在实际的线程环境上也需要进行验证和微调。比如在本文所阐述分页查询的数据项组合场景中。

1、数据组合依赖不同的上游接接口, 它们的响应时间参差不齐,甚至差距还非常大。有些接口支持批量查询而另一些则不支持批量查询。有些接口因为性能问题还需要考虑降级和平滑方案。

2、为了提升用户体验,这里的查询设计了动态列,因此每一次访问所需要组合的数据项和数量也是不同的。

因此这里如果需要估算出一个合理的t是不太现实的。

方案

一种可动态调节的策略,根据监控的反馈对线程池进行微调。整体设计分为装配逻辑线程池封装设计。

1、装配逻辑

查询结果,拆分分片(水平拆分),并行装配(垂直拆分),获得装配项列表(动态列), 并行装配每一项。

页面查询多项数据组合的线程池设计 | 京东云技术团队

2、线程池封装

可调节的核心线程数、最大线程数、线程保持时间,队列大小,提交任务重试等待时间,提交任务重试次数。 固定异常拒绝策略。

调节参数:

字段 名称 说明
corePoolSize 核心线程数 参考线程池定义
maximumPoolSize 最大线程数 参考线程池定义
keepAliveTime 线程存活时间 参考线程池定义
queueSize 队列长度 参考线程池定义
resubmitSleepMillis 提交任务重试等待时间 添加任务被拒绝后重试时的等待时间
resubmitTimes 提交任务重试次数 添加任务被拒绝后重试添加的最大次数
    @Data
    private static class PoolPolicy {

        /** 核心线程数 */
        private Integer corePoolSize;

        /** 最大线程数 */
        private Integer maximumPoolSize;

        /** 线程存活时间 */
        private Integer keepAliveTime;

        /** 队列容量 */
        private Integer queueSize;

        /** 重试等待时间 */
        private Long resubmitSleepMillis;

        /** 重试次数 */
        private Integer resubmitTimes;
    }

创建线程池:

线程池的创建考虑了动态的需求,满足根据压测结果进行微调的要求。首先缓存旧的线程池后再创建新的线程,当新的线程池创建成功后再去关闭旧的线程池。保证在这个替换过程中不影响正在执行的业务。线程池使用了中断策略,用户可以及时感知到系统繁忙并保证了系统资源占用的安全。

public void reloadThreadPool(PoolPolicy poolPolicy) {
    if (poolPolicy == null) {
        throw new RuntimeException("The thread pool policy cannot be empty.");
    }
    if (poolPolicy.getCorePoolSize() == null) {
        poolPolicy.setCorePoolSize(0);
    }
    if (poolPolicy.getMaximumPoolSize() == null) {
        poolPolicy.setMaximumPoolSize(Runtime.getRuntime().availableProcessors() + 1);
    }
    if (poolPolicy.getKeepAliveTime() == null) {
        poolPolicy.setKeepAliveTime(60);
    }
    if (poolPolicy.getQueueSize() == null) {
        poolPolicy.setQueueSize(Runtime.getRuntime().availableProcessors() + 1);
    }
    if (poolPolicy.getResubmitSleepMillis() == null) {
        poolPolicy.setResubmitSleepMillis(200L);
    }
    if (poolPolicy.getResubmitTimes() == null) {
        poolPolicy.setResubmitTimes(5);
    }
    // - 线程池策略没有变化直接返回已有线程池。
    ExecutorService original = this.executorService;
    this.executorService = new ThreadPoolExecutor(
            poolPolicy.getCorePoolSize(),
            poolPolicy.getMaximumPoolSize(),
            poolPolicy.getKeepAliveTime(), TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(poolPolicy.getQueueSize()),
            new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build(),
            new ThreadPoolExecutor.AbortPolicy());
    this.poolPolicy = poolPolicy;
    if (original != null) {
        original.shutdownNow();
    }
}

任务提交:

线程池封装对象中使用的线程池拒绝策略是AbortPolicy,因此在线程数和阻塞队列到达上限后会触发异常。另外在这里为了保证提交的成功率利用重试策略实现了一定程度的延迟处理,具体场景中可以结合业务特点进行适当的调节和配置。

public <T> Future<T> submit(Callable<T> task) {
    RejectedExecutionException exception = null;
    Future<T> future = null;
    for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
        try {
            // - 添加任务
            future = this.executorService.submit(task);
            exception = null;
            break;
        } catch (RejectedExecutionException e) {
            exception = e;
            this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
        }
    }
    if (exception != null) {
        throw exception;
    }
    return future;
}

监控:

1、submit提交的监控

见代码中的「监控点①」,在submit方法中添加监控点,监控key的需要添线程池封装对象的线程名称前缀,用于区分具体的线程池对象。

「监控点①」用于监控添加任务的动作是否正常,以便对线程池对象及策略参数进行微调。

public <T> Future<T> submit(Callable<T> task) {
    // - 监控点①
    CallerInfo callerInfo = Profiler.registerInfo(UmpConstant.THREAD_POOL_WAP + threadNamePrefix,
                UmpConstant.APP_NAME,
                UmpConstant.UMP_DISABLE_HEART,
                UmpConstant.UMP_ENABLE_TP);
    RejectedExecutionException exception = null;
    Future<T> future = null;
    for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
        try {
            // - 添加任务
            future = this.executorService.submit(task);
            exception = null;
            break;
        } catch (RejectedExecutionException e) {
            exception = e;
            this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
        }
    }
    if (exception != null) {
        // - 监控点①
        Profiler.functionError(callerInfo);
        throw exception;
    }
    // - 监控点①
    Profiler.registerInfoEnd(callerInfo);
    return future;
}

2、线程池并行任务

见代码的「监控点②」,分别在添加任务和任务完成后。

「监控点②」实时统计在线程中执行的总任务数量,用于评估线程池的任务的数量的满载水平。

/** 任务并行数量统计 */
private AtomicInteger parallelTaskCount = new AtomicInteger(0);

public <T> Future<T> submit(Callable<T> task) {
    RejectedExecutionException exception = null;
    Future<T> future = null;
    for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) {
        try {
            // - 添加任务
            future = this.executorService.submit(()-> {
                T rst = task.call();
                // - 监控点②
                log.info("{} - Parallel task count {}", this.threadNamePrefix,  this.parallelTaskCount.decrementAndGet());
                return rst;
            });
            // - 监控点②
            log.info("{} + Parallel task count {}", this.threadNamePrefix,  this.parallelTaskCount.incrementAndGet());
            exception = null;
            break;
        } catch (RejectedExecutionException e) {
            exception = e;
            this.theadSleep(this.poolPolicy.getResubmitSleepMillis());
        }
    }
    if (exception != null) {
        throw exception;
    }
    return future;
}

3、调节

线程池封装对象策略的调节时机

1)上线前基于流量预估的压测阶段;

2)上线后跟进监控数据和线程池中任务的满载水平进行人工微调,也可以通过JOB在指定的时间自动调整;

3)大促前依据往期大促峰值来调高相关参数。

线程池封装对象策略的调节经验

1)访问时长要求较低时,我们可以考虑调小线程数和阻塞队列,适当调大提交任务重试等待时间和次数,以便降低资源占用。

2)访问时长要求较高时,就需要调大线程数并保证相对较小的阻塞队列,调小提交任务的重试等待时间和次数甚至分别调成0和1(即关闭重试提交逻辑)。

作者:京东零售 王文明

来源:京东云开发者社区 转载请注明来源

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
java 面试知识点笔记(十三)多线程与并发
java线程池,利用Exceutors创建不同的线程池满足不同场景需求:1.newSingleThreadExecutor() 创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。2.
Wesley13 Wesley13
3年前
Java基础教程——线程池
启动新线程,需要和操作系统进行交互,成本比较高。使用线程池可以提高性能——线程池会提前创建大量的空闲线程,随时待命执行线程任务。在执行完了一个任务之后,线程会回到空闲状态,等待执行下一个任务。(这个任务,就是Runnable的run()方法,或Callable的call()方法)。Java5之前需要手动实现线程池,Java5之
Stella981 Stella981
3年前
Noark入门之线程模型
0x00单线程多进程单线程与单进程多线程的目的都是想尽可能的利用CPU,减少CPU的空闲时间,特别是多核环境,今天咱不做深度解读,跳过...0x01线程池锁最早的一部分游戏服务器是采用线程池的方式来处理玩家的业务请求,以达最大限度的利用多核优势来提高处理业务能力。但线程池同时也带来了并发问题,为了解决同一玩家多个业务请求不被
Wesley13 Wesley13
3年前
Java变成思想
Executor:线程池CatchedThreadPool:创建与所需数量相同的线程,在回收旧线程是停止创建新县城。FixedThreadPool:创建一定数量的线程,所有任务公用这些线程。SingleThreadPool:线程数量为1的FixedThreadPool,并且执行有序。如果需要得到线程返回值,要实现Callbale接口
Wesley13 Wesley13
3年前
Java中的线程池
java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理使用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺
Wesley13 Wesley13
3年前
Java多线程之线程池
 newFixedThreadPool:固定线程池,核心线程数和最大线程数固定相等,而空闲存活时间为0毫秒,说明此参数也无意义,工作队列为最大为Integer.MAX\_VALUE大小的阻塞队列。当执行任务时,如果线程都很忙,就会丢到工作队列等有空闲线程时再执行,队列满就执行默认的拒绝策略 newCachedThreadPool:带缓冲
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
你真的了解@Async吗? | 京东云技术团队
开发中会碰到一些耗时较长或者不需要立即得到执行结果的逻辑,比如消息推送、商品同步等都可以使用异步方法,这时我们可以用到@Async。但是直接使用@Async会有风险,当我们没有指定线程池时,他会默认使用其Spring自带的SimpleAsyncTaskExecutor线程池,会不断的创建线程,当并发大的时候会严重影响性能。所以可以将异步指定线程池使用
京东云开发者 京东云开发者
11个月前
ThreadPoolExecutor线程池内部处理浅析 | 京东物流技术团队
我们知道如果程序中并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束时,会因为频繁创建线程而大大降低系统的效率,因此出现了线程池的使用方式,它可以提前创建好线程来执行任务。本文主要通过java的ThreadPoolExecutor来查看线程池
京东云开发者 京东云开发者
7个月前
"线程池中线程异常后:销毁还是复用?"
一、一个线程池中的线程异常了,那么线程池会怎么处理这个线程?需要说明,本文的线程池都是java.util.concurrent.ExecutorService线程池,本文将围绕验证,阅读源码俩方面来解析这个问题。二、代码验证2.1验证execute提交线程