一、场景引入
前不久做过一个根据下载指令定时下载文件到服务器的需求。轮询下载的周期需要根据下载任务量的大小动态修改,下载任务密集的时候就周期缩小,下载任务少量时就扩大周期时间。java本身和第三方开源框架Spring共有三种执行定时任务的方式:
- Java自带的java.util.Timer类:这个类允许你调度一个java.util.TimerTask任务。(这种方式比较古老,自从第三方开源框架出现以来,基本不使用java自带的定时任务组件)
- 开源的第三方框架: Quartz 或者 elastic-job , 但是这个比较复杂和重量级,适用于分布式场景下的定时任务,可以根据需要多实例部署定时任务。
- 使用Spring提供的注解: @Schedule 。 如果定时任务执行时间较短,并且比较单一,可以使用这个注解。
以上三种执行定时任务的方式都只能固定执行周期,一旦定时任务跑起来之后就不可能修改周期,只能修改周期后重新启动,现在需要动态修改执行周期,故这三种方式都不能使用。
二、解决方式
既然不能用通过传统的方式,那就要想到强大的第三方开源框架带来的便利,Spring从3.0版本开始在框架中加入了一个新的定时任务线程池配置类,即: org.springframework.scheduling.concurrent包中有一个
ThreadPoolTaskScheduler类,它继承了抽象类 ExecutorConfigurationSupport 并实现了 AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler三个接口 ,其中实现了
TaskScheduler中的一个方法:ScheduledFuture<?> schedule(Runnable task, Trigger trigger)源码:
@Overridepublic ScheduledFuture<?> schedule(Runnable task, Trigger trigger) { ScheduledExecutorService executor = getScheduledExecutor(); try { ErrorHandler errorHandler = (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true)); return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule(); } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); }}通过源码即可发现只要将对应的定时任务的线程以及包含cron表达式的 Trigger 参数传入即可按指定的周期启动定时任务。通过源码也可以发现它的线程池大小默认是1:
// ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(boolean) only available on JDK 7+private static final boolean setRemoveOnCancelPolicyAvailable = ClassUtils.hasMethod(ScheduledThreadPoolExecutor.class, "setRemoveOnCancelPolicy", boolean.class);private volatile int poolSize = 1;private volatile boolean removeOnCancelPolicy = false;private volatile ErrorHandler errorHandler;private volatile ScheduledExecutorService scheduledExecutor; 也就是说定时任务默认是单线程串行执行,如果同时需要执行多个定时任务的话,需要对线程池的大小进行配置,我配置的是线程大小是20,如下:
@Bean(autowire = Autowire.BY_NAME, value = "threadPoolConfigBean")public ThreadPoolTaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(20); return scheduler;}然后编写定时任务接口并实现:
public interface Task { /** * 开始任务 */ String startTask(); /** * 停止任务 */ void stopCron(); /** * 重新触发任务 * * @param cron * @return */ boolean triggerAgain(String cron);}
实现类:
public class CacheInfoDownloadTask implements Task { Environment env = SpringUtils.getBean(Environment.class); // spring上下文取bean private ThreadPoolTaskScheduler threadPoolTaskScheduler = (ThreadPoolTaskScheduler) SpringUtils.getBean("threadPoolConfigBean"); private CacheInfoDownloadBll cacheInfoDownloadBll = SpringUtils.getBean(CacheInfoDownloadBll.class); private ScheduledFuture<?> future; private static CacheInfoDownloadTask instance; // 通过单例向外提供唯一实例 public static synchronized CacheInfoDownloadTask getInstance() { if (null == instance) { instance = new CacheInfoDownloadTask(); } return instance; } @Override public String startTask() { String taskCorn = env.getProperty("cacheInfoDownload.schedule"); future = threadPoolTaskScheduler.schedule(new CacheInfoDownloadJob(), new CronTrigger(taskCorn)); log.info("缓存信息拉取开始...执行周期:{}", taskCorn); return "ok"; } @Override public void stopCron() { if (future != null) { future.cancel(true); } log.info("缓存信息拉取任务停止成功..."); } @Override public boolean triggerAgain(String cron) { this.stopCron(); future = this.threadPoolTaskScheduler.schedule(new CacheInfoDownloadJob(), new CronTrigger(cron)); log.info("缓存信息拉取任务更改cron表达式后再次触发成功...执行周期:{}", cron); return true; } private class CacheInfoDownloadJob implements Runnable { @Override public void run() { String schoolId = env.getProperty("school.id"); boolean result = cacheInfoDownloadBll.getCacheInfo(schoolId); if (result) log.info("缓存信息拉取并下载任务执行成功..."); else log.error("缓存信息拉取后存储失败..."); } }}定时任务编写成功后,使用静态工厂模式在Spring任务启动同时启动定时任务,在修改定时任务的执行周期时,只需把调用triggerAgain(String cron)方法即可修改:
@Component@Order(value = 1)public class TaskTriggerOrderConfig implements CommandLineRunner { @Override public void run(String... strings) throws Exception { TaskFactory.getInstance(DOWNLOAD_TASK).startTask(); }}