1、概要
Spring Batch是一个开源的批量处理框架,Spring Batch提供了类和API来读写资源,管理事务,作业处理统计、重启、以及分区技术处理大量数据。在Spring Batch中,一个作业任务可以由多个step组成,每个任务又都可以分为Read-Process-Write或者是tasklet
对于“Read-Process-Write”过程,它是指从资源(csv、xml或数据库)中“读取”数据,“处理”它并“写入”它到其他资源(csv、xml和数据库)。例如,步骤可以从CSV文件中读取数据,对其进行处理并将其写入数据库。Spring Batch提供了许多定制类来读/写CSV、XML和数据库。
对于“单个”操作任务(tasklet),它意味着只执行单个任务,比如在步骤启动或完成之后或之前清理资源。
这些步骤可以链接在一起作为作业运行。
2、项目依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
3、编写一个简单的Tasklet
public class MessageTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
String message = (String) chunkContext.getStepContext().getJobParameters().get("message");
ExecutionContext jobContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
jobContext.put("message", message);
//打印传入的参数
System.out.println(message);
return RepeatStatus.FINISHED;
}
}
4、Job配置
@Configuration
public class TaskletJobConfiguration {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public Job taskletJob() {
return this.jobs.get("taskletJob")
.start(step())
.build();
}
@Bean
protected Step step() {
return steps
.get("step")
.tasklet(messageTasklet())
.build();
}
@Bean
public MessageTasklet messageTasklet() {
MessageTasklet tasklet = new MessageTasklet();
return tasklet;
}
}
5、参数配置
#初始化Spring Batch 数据表
spring.batch.initialize-schema=always
#工程启动时不执行任务
spring.batch.job.enabled=false
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.pool-name=HikariPool
#最大连接数,小于等于0会被重置为默认值10;大于零小于1会被重置为minimum-idle的值
spring.datasource.hikari.maximum-pool-size=10
#连接超时时间:毫秒,小于250毫秒,否则被重置为默认值30秒
spring.datasource.hikari.connection-timeout=60000
#最小空闲连接,默认值10,小于0或大于maximum-pool-size,都会重置为maximum-pool-size
spring.datasource.hikari.minimum-idle=10
#空闲连接超时时间,默认值600000(10分钟),大于等于max-lifetime且max-lifetime>0,会被重置为0;不等于0且小于10秒,会被重置为10秒。
# 只有空闲连接数大于最大连接数且空闲时间超过该值,才会被释放
spring.datasource.hikari.idle-timeout=500000
#连接最大存活时间.不等于0且小于30秒,会被重置为默认值30分钟.设置应该比mysql设置的超时时间短
spring.datasource.hikari.max-lifetime=540000
#连接测试查询
spring.datasource.hikari.connection-test-query=SELECT 1
6、接口测试
@RestController
@Slf4j
public class JobLauncherController {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job job;
@RequestMapping("/launchjob")
public String handle() throws Exception {
String parameter = UUID.randomUUID().toString();
try {
//接口每次都重新生成一个UUID,如果参数完全相同,日志会提示任务已经执行成功,不能重复执行
JobParameters jobParameters = new JobParametersBuilder().addString("message", "Welcome To Spring Batch World!" + parameter)
.toJobParameters();
jobLauncher.run(job, jobParameters);
} catch (Exception e) {
log.error("", e);
}
return parameter;
}
}
最后不要忘记在启动类上加上注解@EnableBatchProcessing
7、Job拦截器
@Component
public class InterceptingJobExecution implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println("Intercepting Job Execution - Before Job!");
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("Intercepting Job Execution - after Job!");
}
}
实现JobExecutionListener
接口后在Job配置的地方增加一下listener即可,如下:
@Autowired
InterceptingJobExecution interceptingJobExecution;
@Bean
public Job taskletJob() {
return this.jobs.get("taskletJob")
.start(step()).listener(interceptingJobExecution)
.build();
}
8、源码
https://github.com/cattles/fucking-great-springbatch