Spring 异步调用,多线程,一行代码实现

Stella981
• 阅读 651

Spring 异步调用,多线程

  • 概述

  • 快速入门

  • 异步回调

  • 异步异常处理

  • 自定义执行器


1、概述


在日常开发中,我们的逻辑都是同步调用,顺序执行。但是在某些情况下我们希望异步调用,将主线程和部分逻辑分开,以达到程序更快速的执行,提升性能。例如,高并发的接口,用户操作日志等。

异步调用,对应的是同步调用。

  • 同步调用:指程序按照 定义顺序 依次执行,每一行程序都必须等待上一行程序执行完成之后才能执行;

  • 异步调用:指程序在顺序执行时,不等异步调用返回执行结果,就执行后面的程序。

考虑到异步的可靠性,我们一般会考虑引入消息队列,例如: RabbitMQ、RocketMQ、Kafka 等等。 但是在一些时候,我们不需要如此高的可靠性,可以使用进程内的队列或线程池。

public static void main(String[] args) { // 创建线程池。这里只是临时测试,开发规范 ExecutorService executor = Executors.newFixedThreadPool(10); ​ // 提交任务到线程池中执行。 executor.submit(new Runnable() { ​ @Override public void run() { System.out.println("听说我被异步调用了"); } ​ }); }

在进程内的队列或者线程池,相对不可靠的原因是,队列和线程池中的任务仅仅存储在内存中,如何JVM进程被异常关闭,将会导致丢失,未被执行。

而分布式消息队列,异步调用会以一个消息的形式,存储在消息服务器上,所以即使JVM进程被异常中断,消息依然在消息服务队列的服务器上

所以使用进程内的队列或者线程池来实现异步调用的话,一定要尽可能的保证JVM进程的优雅关闭,保证他们在关闭前被执行完。

在 Spring Framework 的 Spring Task 模块,提供了 [@Async](https://my.oschina.net/553709938) 注解,可以添加在方法上,自动实现该方法的异步调用

简单来说,我们可以像使用 [@Transactional](https://my.oschina.net/u/3770144) 声明式事务,使用 Spring Task 提供的 [@Async](https://my.oschina.net/553709938) 注解,声明式异步。而在实现原理上,也是基于 Spring AOP 拦截,实现异步提交该操作到线程池中,达到异步调用的目的。

2、快速入门

2.1 引入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"\> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> </parent> <modelVersion>4.0.0</modelVersion> ​ <artifactId>lab-29-async-demo</artifactId> ​ <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> ​ <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> ​ </project> ​

因为 Spring Task 是 Spring Framework 的模块,所以在我们引入 spring-boot-web 依赖后,无需特别引入它。

2.2 Application


创建Application类,添加@EnableAsync 开启 @Async 的支持

@SpringBootApplication @EnableAsync // 开启 @Async 的支持 public class Application { ​ public static void main(String[] args) { SpringApplication.run(Application.class, args); } }

  • 在类上添加 @EnableAsync 注解,启用异步功能。

2.3 DemoService

package cn.iocoder.springboot.lab29.asynctask.service; ​ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; ​ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; ​ @Service public class DemoService { ​ private Logger logger = LoggerFactory.getLogger(getClass());

public Integer execute01() { logger.info("[execute01]"); sleep(10); return 1; } ​ public Integer execute02() { logger.info("[execute02]"); sleep(5); return 2; } ​ private static void sleep(int seconds) { try { Thread.sleep(seconds * 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } ​ @Async public Integer zhaoDaoNvPengYou(Integer a, Integer b) { throw new RuntimeException("程序员不需要女朋友"); } ​ } ​

  • 定义 execute01execute02方法,分别模拟sleep 10秒和5秒。

  • 同时在方法中,使用logger打印日志,方便我们看到每个方法的执行时间,和执行的线程

2.4 同步调用测试

编写DemoServiceTest测试类,添加#task01()方法,同步调用上述方法,代码如下:

@RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class DemoServiceTest { ​ private Logger logger = LoggerFactory.getLogger(getClass()); ​ @Autowired private DemoService demoService; ​ @Test public void task01() { long now = System.currentTimeMillis(); logger.info("[task01][开始执行]"); ​ demoService.execute01(); demoService.execute02(); ​ logger.info("[task01][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now); } }

运行单元测试,打印日志如下:

2020-06-02 09:16:03.391  INFO 3108 --- [      main] c.i.s.l.a.service.DemoServiceTest       : [task01][开始执行] 2020-06-02 09:16:03.402  INFO 3108 --- [      main] c.i.s.l.asynctask.service.DemoService   : [execute01] 2020-06-02 09:16:13.403  INFO 3108 --- [      main] c.i.s.l.asynctask.service.DemoService   : [execute02] 2020-06-02 09:16:18.403  INFO 3108 --- [      main] c.i.s.l.a.service.DemoServiceTest       : [task01][结束执行,消耗时长 15012 毫秒]

  • 两个方法都按顺序执行,执行时间15秒。

  • 都在主线程执行。

2.5 异步调用测试


修改DemoServiceTest ,增加 execute01Async()execute02Async()异步调用方法,代码:

@Async public Integer execute01Async() { return this.execute01(); } ​ @Async public Integer execute02Async() { return this.execute02(); }

  • execute01Async()execute01Async()上,添加@Async实现异步调用

修改DemoServiceTest类, 编写 #task02() 方法,异步调用上述的两个方法。

@Test public void task02() { long now = System.currentTimeMillis(); logger.info("[task02][开始执行]"); ​ demoService.execute01Async(); demoService.execute02Async(); ​ logger.info("[task02][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now); }

打印日志:

2020-06-02 10:57:41.643 INFO 14416 --- [main] c.i.s.l.a.service.DemoServiceTest       : [task02][开始执行] 2020-06-02 10:57:41.675 INFO 14416 --- [main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor' 2020-06-02 10:57:41.682 INFO 14416 --- [main] c.i.s.l.a.service.DemoServiceTest       : [task02][结束执行,消耗时长 39 毫秒]

  • DemoService 的两个方法,异步执行,所以主线程只消耗 39毫秒左右。注意,实际这两个方法,并没有执行完成。

  • DemoService 的两个方法,都在异步线程池中执行。

2.6 等待异步调用完成测试

在上面的【2.5 异步调用测试】异步调用中,两个方法只是异步调用,方法没有执行完。在一些业务场景中,我们达到异步调用效果,同时主线程有返回结果,就需要主线程阻塞等待异步调用的结果。

修改DemoService,添加execute01AsyncWithFuture()execute01AsyncWithFuture()异步调用,并返回 Future 对象 。代码:

@Async public Future<Integer> execute01AsyncWithFuture() { return AsyncResult.forValue(this.execute01()); } ​ @Async public Future<Integer> execute02AsyncWithFuture() { return AsyncResult.forValue(this.execute02()); }

  • 在这里两个异步方法中,添加了AsyncResult.forValue(this.execute02());,返回带有执行结果的Future对象

修改DemoServiceTest类, 编写 #task02() 方法,异步调用上述的两个方法,并阻塞线程等待异步调用返回结果

代码:

@Test public void task03() throws ExecutionException, InterruptedException { long now = System.currentTimeMillis(); logger.info("[task03][开始执行]"); ​ // 执行任务 Future<Integer> execute01Result = demoService.execute01AsyncWithFuture(); Future<Integer> execute02Result = demoService.execute02AsyncWithFuture(); // 阻塞等待结果 execute01Result.get(); execute02Result.get(); ​ logger.info("[task03][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now); }

  • 异步调用两个方法,并返回对应Future对象。这两个的异步调用逻辑,可以并行执行。

  • Future对象的get()方法,效果:阻塞线程等待返回结果。

打印日志:

2020-06-02 13:56:43.955  INFO 7828 --- [ main] c.i.s.l.a.service.DemoServiceTest       : [task03][开始执行] 2020-06-02 13:56:43.987  INFO 7828 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor' 2020-06-02 13:56:44.008  INFO 7828 --- [ task-1] c.i.s.l.asynctask.service.DemoService   : [execute01] 2020-06-02 13:56:44.008  INFO 7828 --- [ task-2] c.i.s.l.asynctask.service.DemoService   : [execute02] 2020-06-02 13:56:54.008  INFO 7828 --- [ main] c.i.s.l.a.service.DemoServiceTest       : [task03][结束执行,消耗时长 10053 毫秒]

  • 两个异步调用方法,分别由线程池 task-1task-2 同时执行。 因为主线程阻塞等待执行结果 ,执行时间10秒,当同时有多个异步调用,线程阻塞等待,执行时间由消耗最长的异步调用逻辑所决定。

2.7 应用配置文件

在application中,添加spring Task配置

spring: task: # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。 execution: thread-name-prefix: task- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置 pool: # 线程池相关 core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。 max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒 queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。 allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。 shutdown: await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置

  • Spring 本身依赖了Spring Task

  • spring.task.execution配置项, Spring Task 调度任务的配置 ,对应TaskExecutionProperties配置类

  • Spring Boot TaskExecutionAutoConfiguration 自动化配置类, 实现了Spring Task 自动配置,创建了ThreadPoolTaskExecutor基于线程池的任务执行器,实际上ThreadPoolTaskExecutor就是ThreadPoolExecutor的分装,主要增加执行任务,并返回 ListenableFuture 对象功能。

之前说的异步的可靠性,要优雅的关闭进程。spring.task.execution.shutdown配置关闭,是为了实现Spring Task的优雅关闭。异步任务在执行过程中,如果应用开始关闭,异步任务需要使用的Bean被销毁,例如:需要访问数据库连接池,这时候异步任务还在执行中,一旦需要访问数据库,但是没有对应的Bean将会导致报错。

  • 通过配置await-termination: true,实现在应用关闭时,等待异步任务执行完成。这样在应用关闭时,Spring 会等待 ThreadPoolTaskExecutor执行完任务,再销毁Bean

  • 应用关闭时,在某些业务场景下我们不可能让Spring一直等待,异步任务的完成。通过配置await-termination-period: 60,设置Spring最大等待时间,时间一到将不再等待异步任务完成。

3、异步回调

业务场景中,执行完异步任务,可能需回调。下面介绍异步执行完成后,实现自定义回调。

3.1、AsyncResult 源码解释

2.6 等待异步调用完成 中,我们看到的 AsyncResult类 表示异步结果。返回结果分为两种情况:

  • 执行成功时,调用AsyncResult#forValue(V value) 静态方法,返回成功的 ListenableFuture对象,

    源码:

    /** * Create a new async result which exposes the given value from {@link Future#get()}. * @param value the value to expose * @since 4.2 * @see Future#get() */ public static <V> ListenableFuture<V> forValue(V value) { return new AsyncResult<>(value, null); }

  • 执行异常时,调用 AsyncResult#forExecutionException(Throwable ex) 静态方法,返回异常的 ListenableFuture 对象。源码:

    /** * Create a new async result which exposes the given exception as an * {@link ExecutionException} from {@link Future#get()}. * @param ex the exception to expose (either an pre-built {@link ExecutionException} * or a cause to be wrapped in an {@link ExecutionException}) * @since 4.2 * @see ExecutionException */ public static <V> ListenableFuture<V> forExecutionException(Throwable ex) { return new AsyncResult<>(null, ex); }

AsyncResult 同时也实现了 ListenableFuture接口,提供异步执行结果回调处理。

public class AsyncResult<V> implements ListenableFuture<V>

ListenableFuture接口,源码:

public interface ListenableFuture<T> extends Future<T> { ​ // 添加回调方法,统一处理成功和异常的情况。 void addCallback(ListenableFutureCallback<? super T> callback); ​ // 添加成功和失败的回调方法,分别处理成功和异常的情况。 void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback); ​ ​ // 将 ListenableFuture 转换成 JDK8 提供的 CompletableFuture 。 // 这样,后续我们可以使用 ListenableFuture 来设置回调 default CompletableFuture<T> completable() { CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this); addCallback(completable::complete, completable::completeExceptionally); return completable; } ​ }

ListenableFuture继承了Future,所以AsyncResult 也实现了Future的接口,源码:

public interface Future<V> { ​ // 如果任务还没开始,执行 cancel(...) 方法将返回 false; // 如果任务已经启动,执行 cancel(true) 方法将以中断执行此任务线程的方式来试图停止任务,如果停止成功,返回 true ; // 当任务已经启动,执行 cancel(false) 方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时返回 false ; // 当任务已经完成,执行 cancel(...) 方法将返回 false 。 // mayInterruptRunning 参数表示是否中断执行中的线程。 boolean cancel(boolean mayInterruptIfRunning); ​ // 如果任务完成前被取消,则返回 true 。 boolean isCancelled(); ​ // 如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回 true 。 boolean isDone(); ​ // 获取异步执行的结果,如果没有结果可用,此方法会阻塞直到异步计算完成。 V get() throws InterruptedException, ExecutionException;

// 获取异步执行结果,如果没有结果可用,此方法会阻塞,但是会有时间限制,如果阻塞时间超过设定的 timeout 时间,该方法将抛出异常。 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

AsyncResult 中对addCallback(...)方法回调的实现,源码:

@Override public void addCallback(ListenableFutureCallback<? super V> callback) { addCallback(callback, callback); } ​ @Override public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) { try { if (this.executionException != null) { // 《1》 failureCallback.onFailure(exposedException(this.executionException)); } else { // 《2》 successCallback.onSuccess(this.value); } } catch (Throwable ex) { // 《3》 // Ignore } } ​ // 从 ExecutionException 中,获得原始异常。 private static Throwable exposedException(Throwable original) { if (original instanceof ExecutionException) { Throwable cause = original.getCause(); if (cause != null) { return cause; } } return original; }

  • ListenableFutureCallback 知道 ,ListenableFutureCallback 接口同时继承了 SuccessCallbackFailureCallback接口

public interface ListenableFutureCallback<T> extends SuccessCallback<T>, FailureCallback

  • 《1》,如果是异常处理结果调用 failureCallback回调

  • 《2》,如果是成功处理结果调用successCallback回调

  • 《3》,如果回调逻辑发生异常,直接忽略。假设多个回调,其中一个出现议程,不会影响其他的回调。

实际上,AsyncResult 是作为异步执行的结果。既然是结果,执行就已经完成。所以,在我们调用 #addCallback(...) 接口方法来添加回调时,必然直接使用回调处理执行的结果

AsyncResult 对 Future 定义的所有方法,实现代码如下:

// AsyncResult.java ​ @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; // 因为是 AsyncResult 是执行结果,所以直接返回 false 表示取消失败。 } ​ @Override public boolean isCancelled() { return false; // 因为是 AsyncResult 是执行结果,所以直接返回 false 表示未取消。 } ​ @Override public boolean isDone() { return true; // 因为是 AsyncResult 是执行结果,所以直接返回 true 表示已完成。 } ​ @Override @Nullable public V get() throws ExecutionException { // 如果发生异常,则抛出该异常。 if (this.executionException != null) { throw (this.executionException instanceof ExecutionException ? (ExecutionException) this.executionException : new ExecutionException(this.executionException)); } // 如果执行成功,则返回该 value 结果 return this.value; } ​ @Override @Nullable public V get(long timeout, TimeUnit unit) throws ExecutionException { return get(); }

3.2 ListenableFutureTask

在我们调用使用 @Async 注解的方法时,如果方法返回的类型是 ListenableFuture 的情况下,实际方法返回的是 ListenableFutureTask 对象。

ListenableFutureTask 类,也实现 ListenableFuture 接口,继承 FutureTask 类,ListenableFuture 的 FutureTask 实现类。

ListenableFutureTask 对 ListenableFuture 定义的 #addCallback(...) 方法,实现源码如下:

private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<T>(); ​ @Override public void addCallback(ListenableFutureCallback<? super T> callback) { this.callbacks.addCallback(callback); } ​ @Override public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) { this.callbacks.addSuccessCallback(successCallback); this.callbacks.addFailureCallback(failureCallback); }

  • 可以看到在ListenableFutureTask中,暂存回调到ListenableFutureCallbackRegistry

ListenableFutureTask 对 FutureTask 已实现的 #done() 方法,进行重写。实现源码如下:

@Override protected void done() { Throwable cause; try { // 获得执行结果 T result = get(); // 执行成功,执行成功的回调 this.callbacks.success(result); return; } catch (InterruptedException ex) { // 如果有中断异常 InterruptedException异常,则打断当前线程,直接返回 Thread.currentThread().interrupt(); return; } catch (ExecutionException ex) { // 如果有 ExecutionException 异常,获取真实异常,并设置到cause中 cause = ex.getCause(); if (cause == null) { cause = ex; } } catch (Throwable ex) { cause = ex; // 设置异常到 cause 中 } // 执行异常,执行异常的回调 this.callbacks.failure(cause); }

3.3 具体示例

修改 DemoService 的代码,增加 #execute02() 的异步调用,并返回 ListenableFuture 对象。代码如下:

@Async public ListenableFuture<Integer> execute01AsyncWithListenableFuture() { try { //int i = 1 / 0; return AsyncResult.forValue(this.execute02()); } catch (Throwable ex) { return AsyncResult.forExecutionException(ex); } }

  • 根据执行的结果,包装出成功还是异常的 AsyncResult 对象。

DemoServiceTest 测试类,编写 #task04() 方法,异步调用上述的方法,在塞等待执行完成的同时,添加相应的回调 Callback 方法。代码:

@Test public void task04() throws ExecutionException, InterruptedException { long now = System.currentTimeMillis(); logger.info("[task04][开始执行]"); ​ // <1> 执行任务 ListenableFuture<Integer> execute01Result = demoService.execute01AsyncWithListenableFuture(); logger.info("[task04][execute01Result 的类型是:({})]",execute01Result.getClass().getSimpleName()); execute01Result.addCallback(new SuccessCallback<Integer>() { // <2.1> 增加成功的回调 ​ @Override public void onSuccess(Integer result) { logger.info("[onSuccess][result: {}]", result); } ​ }, new FailureCallback() { // <2.1> 增加失败的回调 ​ @Override public void onFailure(Throwable ex) { logger.info("[onFailure][发生异常]", ex); } ​ }); execute01Result.addCallback(new ListenableFutureCallback<Integer>() { // <2.2> 增加成功和失败的统一回调 ​ @Override public void onSuccess(Integer result) { logger.info("[onSuccess][result: {}]", result); } ​ @Override public void onFailure(Throwable ex) { logger.info("[onFailure][发生异常]", ex); } ​ }); // <3> 阻塞等待结果 execute01Result.get(); ​ logger.info("[task04][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now); }

  • <1> 处,调用 DemoService#execute01AsyncWithListenableFuture() 方法,异步调用该方法,并返回 ListenableFutureTask 对象。这里,我们看下打印的日志。

    2020-06-08 14:13:16.738 INFO 5060 --- [ main] c.i.s.l.a.service.DemoServiceTest : [task04][execute01Result 的类型是:(ListenableFutureTask)]

  • <2.1> 处,增加成功的回调和失败的回调。

  • <2.2> 处,增加成功和失败的统一回调。

  • <3> 处,阻塞等待结果。执行完成后,我们会看到回调被执行,打印日志如下:

    2020-06-08 14:13:21.752  INFO 5060 --- [   main] c.i.s.l.a.service.DemoServiceTest   : [task04][结束执行,消耗时长 5057 毫秒] 2020-06-08 14:13:21.752  INFO 5060 --- [ task-1] c.i.s.l.a.service.DemoServiceTest   : [onSuccess][result: 2] 2020-06-08 14:13:21.752  INFO 5060 --- [ task-1] c.i.s.l.a.service.DemoServiceTest   : [onSuccess][result: 2]

4. 异步异常处理器

通过实现 AsyncUncaughtExceptionHandler 接口,达到对异步调用的异常的统一处理。

创建 GlobalAsyncExceptionHandler 类,全局统一的异步调用异常的处理器。代码:

@Component public class GlobalAsyncExceptionHandler implements AsyncUncaughtExceptionHandler { ​ private Logger logger = LoggerFactory.getLogger(getClass()); ​ @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { logger.error("[handleUncaughtException][method({}) params({}) 发生异常]", method, params, ex); } ​ }

  • 类上,我们添加了 @Component 注解,考虑到胖友可能会注入一些 Spring Bean 到属性中。

  • 实现 #handleUncaughtException(Throwable ex, Method method, Object... params) 方法,打印异常日志。

注意,AsyncUncaughtExceptionHandler 只能拦截返回类型非 Future 的异步调用方法。通过看 AsyncExecutionAspectSupport#handleError(Throwable ex, Method method, Object... params) 的源码,可以很容易得到这个结论,代码:

// AsyncExecutionAspectSupport.java ​ protected void handleError(Throwable ex, Method method, Object... params) throws Exception { // 重点!!!如果返回类型是 Future ,则直接抛出该异常。 if (Future.class.isAssignableFrom(method.getReturnType())) { ReflectionUtils.rethrowException(ex); } else { // 否则,交给 AsyncUncaughtExceptionHandler 来处理。 // Could not transmit the exception to the caller with default executor try { this.exceptionHandler.obtain().handleUncaughtException(ex, method, params); } catch (Throwable ex2) { logger.warn("Exception handler for async method '" + method.toGenericString() + "' threw unexpected exception itself", ex2); } } }

  • 对了,AsyncExecutionAspectSupport 是 AsyncExecutionInterceptor 的父类哟。所以哟,返回类型为 Future 的异步调用方法,需要通过「3. 异步回调」来处理。

4.2 AsyncConfig


创建 AsyncConfig 类,配置异常处理器。代码:

@Configuration @EnableAsync // 开启 @Async 的支持 public class AsyncConfig implements AsyncConfigurer { ​ @Autowired private GlobalAsyncExceptionHandler exceptionHandler; ​ @Override public Executor getAsyncExecutor() { return null; } ​ @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return exceptionHandler; } ​ }

  • 在类上添加 @EnableAsync 注解,启用异步功能。这样「2. Application」 的 @EnableAsync 注解,也就可以去掉了。

  • 实现 AsyncConfigurer 接口,实现异步相关的全局配置。 此时此刻,胖友有没想到 SpringMVC 的 WebMvcConfigurer 接口。

  • 实现 #getAsyncUncaughtExceptionHandler() 方法,返回我们定义的 GlobalAsyncExceptionHandler 对象。

  • 实现 #getAsyncExecutor() 方法,返回 Spring Task 异步任务的默认执行器。这里,我们返回了 null ,并未定义默认执行器。所以最终会使用 TaskExecutionAutoConfiguration 自动化配置类创建出来的 ThreadPoolTaskExecutor 任务执行器,作为默认执行器。

4.3 DemoService

DemoService 类,增加 #zhaoDaoNvPengYou(...) 的异步调用。代码如下:

@Async public Integer zhaoDaoNvPengYou(Integer a, Integer b) { throw new RuntimeException("异步全局异常"); }

4.4 简单测试

@Test public void testZhaoDaoNvPengYou() throws InterruptedException { demoService.zhaoDaoNvPengYou(1, 2); ​ // sleep 1 秒,保证异步调用的执行 Thread.sleep(1000); }

运行单元测试,执行日志如下:

2020-06-08 15:26:35.120 ERROR 11388 --- [         task-1] .i.s.l.a.c.a.GlobalAsyncExceptionHandler : [handleUncaughtException][method(public java.lang.Integer cn.iocoder.springboot.lab29.asynctask.service.DemoService.zhaoDaoNvPengYou(java.lang.Integer,java.lang.Integer)) params([1, 2]) 发生异常] ​ java.lang.RuntimeException: 异步全局异常

5. 自定义执行器

在 上面 中,我们使用 Spring Boot TaskExecutionAutoConfiguration 自动化配置类,实现自动配置 ThreadPoolTaskExecutor 任务执行器。

本小节,我们希望两个自定义 ThreadPoolTaskExecutor 任务执行器,实现不同方法,分别使用这两个 ThreadPoolTaskExecutor 任务执行器。

5.1 引入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"\> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> </parent> <modelVersion>4.0.0</modelVersion> ​ <artifactId>lab-29-async-demo</artifactId> ​ <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> ​ <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> ​ </project>

  • 和 上面引入依赖 一致。

5.2 应用配置文件


application.yml 中,添加 Spring Task 定时任务的配置,如下:

spring: task: # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。 execution-one: thread-name-prefix: task-one- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置 pool: # 线程池相关 core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。 max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒 queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。 allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。 shutdown: await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置 # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。 execution-two: thread-name-prefix: task-two- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置 pool: # 线程池相关 core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。 max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒 queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。 allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。 shutdown: await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置

  • spring.task 配置项下,我们新增了 execution-oneexecution-two 两个执行器的配置。在格式上,我们保持和在「2.7 应用配置文件」看到的 spring.task.exeuction 一致,方便我们后续复用 TaskExecutionProperties 属性配置类来映射。

5.3 AsyncConfig


创建 AsyncConfig 类,配置两个执行器。代码如下:

@Configuration @EnableAsync // 开启 @Async 的支持 public class AsyncConfig { ​ public static final String EXECUTOR_ONE_BEAN_NAME = "executor-one"; public static final String EXECUTOR_TWO_BEAN_NAME = "executor-two"; ​ @Configuration public static class ExecutorOneConfiguration { ​ @Bean(name = EXECUTOR_ONE_BEAN_NAME + "-properties") @Primary @ConfigurationProperties(prefix = "spring.task.execution-one") // 读取 spring.task.execution-one 配置到 TaskExecutionProperties 对象 public TaskExecutionProperties taskExecutionProperties() { return new TaskExecutionProperties(); } ​ @Bean(name = EXECUTOR_ONE_BEAN_NAME) public ThreadPoolTaskExecutor threadPoolTaskExecutor() { // 创建 TaskExecutorBuilder 对象 TaskExecutorBuilder builder = createTskExecutorBuilder(this.taskExecutionProperties()); // 创建 ThreadPoolTaskExecutor 对象 return builder.build(); } ​ } ​ @Configuration public static class ExecutorTwoConfiguration { ​ @Bean(name = EXECUTOR_TWO_BEAN_NAME + "-properties") @ConfigurationProperties(prefix = "spring.task.execution-two") // 读取 spring.task.execution-two 配置到 TaskExecutionProperties 对象 public TaskExecutionProperties taskExecutionProperties() { return new TaskExecutionProperties(); } ​ @Bean(name = EXECUTOR_TWO_BEAN_NAME) public ThreadPoolTaskExecutor threadPoolTaskExecutor() { // 创建 TaskExecutorBuilder 对象 TaskExecutorBuilder builder = createTskExecutorBuilder(this.taskExecutionProperties()); // 创建 ThreadPoolTaskExecutor 对象 return builder.build(); } } ​ private static TaskExecutorBuilder createTskExecutorBuilder(TaskExecutionProperties properties) { // Pool 属性 TaskExecutionProperties.Pool pool = properties.getPool(); TaskExecutorBuilder builder = new TaskExecutorBuilder(); builder = builder.queueCapacity(pool.getQueueCapacity()); builder = builder.corePoolSize(pool.getCoreSize()); builder = builder.maxPoolSize(pool.getMaxSize()); builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); builder = builder.keepAlive(pool.getKeepAlive()); // Shutdown 属性 TaskExecutionProperties.Shutdown shutdown = properties.getShutdown(); builder = builder.awaitTermination(shutdown.isAwaitTermination()); builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod()); // 其它基本属性 builder = builder.threadNamePrefix(properties.getThreadNamePrefix()); //       builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator); //       builder = builder.taskDecorator(taskDecorator.getIfUnique()); return builder; } ​ } ​

  • 参考 Spring Boot TaskExecutionAutoConfiguration 自动化配置类,我们创建了 ExecutorOneConfiguration 和 ExecutorTwoConfiguration 配置类,来分别创建 Bean 名字为 executor-oneexecutor-two 两个执行器。

5.4 DemoService


@Service public class DemoService { ​ private Logger logger = LoggerFactory.getLogger(getClass()); ​ @Async(AsyncConfig.EXECUTOR_ONE_BEAN_NAME) public Integer execute01() { logger.info("[execute01]"); return 1; } ​ @Async(AsyncConfig.EXECUTOR_TWO_BEAN_NAME) public Integer execute02() { logger.info("[execute02]"); return 2; } ​ }

  • @Async 注解上,我们设置了其使用的执行器的 Bean 名字。

5.5 简单测试


@RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class DemoServiceTest { ​ @Autowired private DemoService demoService; ​ @Test public void testExecute() throws InterruptedException { demoService.execute01(); demoService.execute02(); ​ // sleep 1 秒,保证异步调用的执行 Thread.sleep(1000); } ​ } ​

运行单元测试,执行日志如下:

2020-06-08 15:38:28.846  INFO 12020 --- [     task-one-1] c.i.s.l.asynctask.service.DemoService   : [execute01] 2020-06-08 15:38:28.846  INFO 12020 --- [     task-two-1] c.i.s.l.asynctask.service.DemoService   : [execute02]

  • 从日志中,我们可以看到,#execute01() 方法在 executor-one 执行器中执行,而 #execute02() 方法在 executor-two 执行器中执行。
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
3个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这