并发编程-ExecutorCompletionService解析

京东云开发者
• 阅读 945

1、简单介绍



我们在并发编程中,目前大部分做法都是将任务添加到线程池中,并拿到Future对象,将其添加到集合中,等所有任务都添加到线程池后,在通过遍历Future集合,调用future.get()来获取每个任务的结果,这样可以使得先添加到线程池的任务先等待其完成,但是并不能保证第一个添加到线程池的任务就是第一个执行完成的,所以会出现这种情况,后面添加到线程池的任务已经完成了,但是还必须要等待第一个任务执行完成并处理结果后才能处理接下来的任务。

如果想要不管添加到线程池的任务的顺序,先完成的任务先进行处理,那么就需要用到ExecutorCompletionService这个工具了。



2、源码解析



ExecutorCompletionService实现了CompletionService接口。CompletionService接种有有以下方法。

public interface CompletionService<V> {
    // 提交任务
    Future<V> submit(Callable<V> task);
    // 提交任务
    Future<V> submit(Runnable task, V result);
    // 获取任务结果,带抛出异常
    Future<V> take() throws InterruptedException;
    // 获取任务结果
    Future<V> poll();
    // 获取任务结果,带超时
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}



可以看到接口中的方法非常简单,只有提交任务以及获取任务结果两类方法。

我们再看下实现类ExecutorCompletionService中的代码。

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * FutureTask的子类,重写FutureTask完成后的done方法
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
    // task任务执行完成后将任务放到队列中
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

    /**
     * 构造方法,传入一个线程池,创建一个队列
     */
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    /**
     * 构造方法,传入线程池和队列
     */
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    // 提交一个task任务,最终将任务封装成QueueingFuture并由指定的线程池执行
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    // 提交一个task任务,最终将任务封装成QueueingFuture并由指定的线程池执行
    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    // 从队列中获取执行完成的RunnableFuture对象,take方法会阻塞直到有数据
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    // 从队列中获取执行完成的RunnableFuture对象
    public Future<V> poll() {
        return completionQueue.poll();
    }

    // 从队列中获取执行完成的RunnableFuture对象
    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}



通过观察实现类中的代码,我们可以发现这个方法非常简单,其原理分为以下几步:

1、在构造ExecutorCompletionService对象时,需要传入给定的线程池或者阻塞队列。

2、当我们提交任务到ExecutorCompletionService时,会将提交的任务包装成QueueingFuture对象,然后交由我们指定的线程池来执行。

3、当任务执行完成后,QueueingFuture对象会执行最终的done方法(QueueingFuture对象重新的方法),将RunnableFuture对象添加到指定的阻塞队列中。

4、我们可以通过poll或者take方法来获取队列中的RunnableFuture对象,以便获取执行结果。

由此可以发现我们获取到的任务执行结果,与提交到线程池的任务顺序是无关的,哪个任务先完成,就会被添加到队列中,我们就可以先获取执行结果。



3、使用场景



1、当我们不关注提交到线程池任务顺序以及任务执行完成获取结果的顺序时,我们就可以使用ExecutorCompletionService这个来执行任务。以下是示例代码。

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException {
        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
        for (Callable<Result> s : solvers) {
            ecs.submit(s);
        }
        int n = solvers.size();
        for (int i = 0; i < n; ++i) {
            Result r = ecs.take().get();
            if (r != null) {
                use(r);
            }
        }
    }

2、当多个任务同时执行,我们只需要获取第一个任务的执行结果,其余结果不需要关心时,也可以通过ExecutorCompletionService来执行任务。以下是示例代码。

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
        int n = solvers.size();
        List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
        Result result = null;
        try {
            for (Callable<Result> s : solvers) {
                futures.add(ecs.submit(s));
            }

            for (int i = 0; i < n; ++i) {
                try {
                    Result r = ecs.take().get();
                    if (r != null) {
                        result = r;
                        break;
                    }
                } catch (ExecutionException ignore) {
                }
            }
        } finally {
            for (Future<Result> f : futures) {
                f.cancel(true);
            }
        }

        if (result != null) {
            use(result);
        }
    }



4、代码实践



在业务上我们有这种场景,我们有一批订单进行批量更新,每处理完一单,我们都需要维护一下处理进度,保证订单处理进度实时更新成最新的进度数据,我们此时用到的就是ExecutorCompletionService。

protected void parallelBatchUpdateWaybill(Map<String, LwbMain> lwbMainMap, Map<String, UpdateWaybillTaskDetail> taskDetailMap) {
        long start = System.currentTimeMillis();
        log.info("{} 并行批量更新订单开始:{}", traceId, taskNo);
        int total = lwbMainMap.size();
        BlockingQueue<Future<String>> blockingQueue = new LinkedBlockingQueue<>(total + 2);
        ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService<>(parallelUpdateWaybillExecutorService, blockingQueue);
        for (Map.Entry<String, UpdateWaybillTaskDetail> entry : taskDetailMap.entrySet()) {
            String lwbNo = entry.getKey();
            LwbMain lwbMain = lwbMainMap.get(lwbNo);
            UpdateWaybillTaskDetail taskDetail = entry.getValue();
            executorCompletionService.submit(() -> this.updateSingleWaybill(lwbMain, taskDetail), "done");
        }

        for (int current = 0; current < taskDetailMap.size(); current++) {
            try {
                executorCompletionService.take().get();
            } catch (Exception e) {
                log.error("{} 获取并行批量更新订单结果异常:{}", traceId, e.getMessage(), e);
            } finally {
                jimClient.incr(importTaskNo);
            }
        }

        long end = System.currentTimeMillis();
        log.info("{} 并行批量更新订单结束:{},耗时:{}", traceId, taskNo, (end - start));
    }
点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
java 面试知识点笔记(十三)多线程与并发
java线程池,利用Exceutors创建不同的线程池满足不同场景需求:1.newSingleThreadExecutor() 创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。2.
Wesley13 Wesley13
3年前
java目前可以通过以下几种方式进行定时任务
1、单机部署模式Timer:jdk中自带的一个定时调度类,可以简单的实现按某一频度进行任务执行。提供的功能比较单一,无法实现复杂的调度任务。ScheduledExecutorService:也是jdk自带的一个基于线程池设计的定时任务类。其每个调度任务都会分配到线程池中的一个线程执行,所以其任务是并发执行的,互不影响。
Wesley13 Wesley13
3年前
java_线程池
血一样的教训,今天上午参加了一家现场面试java。在这之前,我一直认为我的java基础还是可以的,而今天一问三不知。现在将面试的问题整理出来一、说说java中的线程池?  1.线程池:线程池是线程的集合,不用自己创建线程,把线程直接给线程池,由线程池处理。   2.过程:首先,使用线程池可以重复利用已有的线程继续执行任务,避免线程在
Wesley13 Wesley13
3年前
java executor
在java.util.concurrent包中的ExecutorService的实现就是壹個线程池的实现任务的委托(TaskDelegation)壹旦线程把任务委托给ExecutorService,该线程就会继续执行与运行任务无关的其它任务。Executor框架的两级调度模型在HotSpotVM的线程模型中,Java线程
Stella981 Stella981
3年前
ExecutorService 线程池 (转发)
1.ExecutorServicejava.util.concurrent.ExecutorService接口。用来设置线程池并执行多线程任务。它有以下几个方法。Future<?java.util.concurrent.ExecutorService.submit(Runnabletask)提交任务并执行,返回代表这个任务的future
Wesley13 Wesley13
3年前
Java基础教程——线程池
启动新线程,需要和操作系统进行交互,成本比较高。使用线程池可以提高性能——线程池会提前创建大量的空闲线程,随时待命执行线程任务。在执行完了一个任务之后,线程会回到空闲状态,等待执行下一个任务。(这个任务,就是Runnable的run()方法,或Callable的call()方法)。Java5之前需要手动实现线程池,Java5之
Wesley13 Wesley13
3年前
Java多线程之任务执行
Java多线程之任务执行一、在线程中执行任务1.串行的执行任务在应用程序中可以通过多种策略来调度任务,而其中的策略能够更好的利用潜在的并发性。_最简单的策略就是在单个线程中串行的执行各项任务。_public class SingleThreadWebServer {
Wesley13 Wesley13
3年前
Java中的线程池
java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理使用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺
并发编程-FutureTask解析 | 京东物流技术团队
通过本文可以了解FutureTask任务执行的方式以及Future.get已阻塞的方式获取线程执行的结果原理,并且从代码中可以了解FutureTask的任务执行状态以及状态的变化过程。
京东云开发者 京东云开发者
11个月前
ThreadPoolExecutor线程池内部处理浅析 | 京东物流技术团队
我们知道如果程序中并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束时,会因为频繁创建线程而大大降低系统的效率,因此出现了线程池的使用方式,它可以提前创建好线程来执行任务。本文主要通过java的ThreadPoolExecutor来查看线程池