背景:
如果一个任务由多个子任务组成,子任务全部执行完成后然后由主线程对所有子任务结果进行封装,可以采用如下几种方式:
1、基于Guava ListenableFuture 进行;
2、基于FutureTask 和CountDownLatch进行
3、基于FutureTask进行;
4、基于CompletionService进行
5、基于BlockingQueue进行
说明:
2、3 的区别就是线程池时候每次都新建、shutdown;
4、5 是一个东西
public static void listenableFuture() {
try {
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2));
List<ListenableFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < 20; i++) {
final ListenableFuture<Integer> future = pool.submit(new CountTask());
futures.add(future);
Futures.addCallback(future, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
System.out.println(result);
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
});
}
System.out.println("submit success");
ListenableFuture<List<Integer>> ret = Futures.successfulAsList(futures);
List<Integer> res = ret.get();
System.out.println(res);
pool.shutdown();
System.out.println("shutdown success");
} catch (Exception e) {
e.printStackTrace();
}
}
public static void countDownCount() throws Exception {
int threadNum = 20;
ExecutorService executor = Executors.newCachedThreadPool();
CountDownLatch count = new CountDownLatch(threadNum);
List<FutureTask<Integer>> futureTasks = new ArrayList<>();
for (int i = 0; i < threadNum; i++) {
CountTask task = new CountTask(count);
FutureTask<Integer> futureTask = new FutureTask<>(task);
executor.submit(futureTask);
futureTasks.add(futureTask);
}
// 该动作会阻塞主线程知道各个线程完成任务
count.await();
System.out.println("执行完成");
for (FutureTask<Integer> futureTask : futureTasks) {
Integer ret = futureTask.get();
System.out.println(ret);
}
executor.shutdown();
System.out.println("测试完成");
}
public static void futureTaskCount() throws Exception {
int threadNum = 20;
ExecutorService executor = Executors.newCachedThreadPool();
List<FutureTask<Integer>> futureTasks = new ArrayList<>();
for (int i = 0; i < threadNum; i++) {
CountTask task = new CountTask();
FutureTask<Integer> futureTask = new FutureTask<>(task);
executor.submit(futureTask);
futureTasks.add(futureTask);
}
// 关闭线程池,该动作会阻塞主线程知道线程池中线程执行完成
executor.shutdown();
System.out.println("shutdown");
for (FutureTask<Integer> futureTask : futureTasks) {
Integer ret = futureTask.get();
System.out.println(ret);
}
System.out.println("测试完成");
}
public static void completionCount() throws Exception {
int threadNum = 20;
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(executor);
for (int i = 0; i < threadNum; i++) {
pool.submit(new CountTask());
}
for (int i = 0; i < threadNum; i++) {
Integer ret = pool.take().get();
System.out.println("输出结果" + ret);
}
System.out.println("测试完成");
executor.shutdown();
}
// 使用阻塞容器保存每次Executor处理的结果,在后面进行统一处理
public static void blockingQueueCount() throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();
for (int i = 0; i < 10; i++) {
Future<Integer> future = exec.submit(new CountTask());
queue.add(future);
}
int sum = 0;
int queueSize = queue.size();
for (int i = 0; i < queueSize; i++) {
sum += queue.take().get();
}
System.out.println("总数为:" + sum);
exec.shutdown();
}