SpringBatch系列入门之Tasklet

Stella981
• 阅读 682

SpringBatch系列入门之Tasklet

1、概要

Spring Batch是一个开源的批量处理框架,Spring Batch提供了类和API来读写资源,管理事务,作业处理统计、重启、以及分区技术处理大量数据。在Spring Batch中,一个作业任务可以由多个step组成,每个任务又都可以分为Read-Process-Write或者是tasklet

  • 对于“Read-Process-Write”过程,它是指从资源(csv、xml或数据库)中“读取”数据,“处理”它并“写入”它到其他资源(csv、xml和数据库)。例如,步骤可以从CSV文件中读取数据,对其进行处理并将其写入数据库。Spring Batch提供了许多定制类来读/写CSV、XML和数据库。

  • 对于“单个”操作任务(tasklet),它意味着只执行单个任务,比如在步骤启动或完成之后或之前清理资源。

  • 这些步骤可以链接在一起作为作业运行。

2、项目依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

3、编写一个简单的Tasklet

public class MessageTasklet implements Tasklet {


    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
        String message = (String) chunkContext.getStepContext().getJobParameters().get("message");
        ExecutionContext jobContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
        
        jobContext.put("message", message);
        //打印传入的参数
        System.out.println(message);

        return RepeatStatus.FINISHED;
    }
}

4、Job配置

@Configuration
public class TaskletJobConfiguration {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public Job taskletJob() {
        return this.jobs.get("taskletJob")
                .start(step())
                .build();
    }

    @Bean
    protected Step step() {
        return steps
                .get("step")
                .tasklet(messageTasklet())
                .build();
    }

    @Bean
    public MessageTasklet messageTasklet() {
        MessageTasklet tasklet = new MessageTasklet();
        return tasklet;
    }

}

5、参数配置

#初始化Spring Batch 数据表
spring.batch.initialize-schema=always

#工程启动时不执行任务
spring.batch.job.enabled=false

spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.pool-name=HikariPool
#最大连接数,小于等于0会被重置为默认值10;大于零小于1会被重置为minimum-idle的值
spring.datasource.hikari.maximum-pool-size=10
#连接超时时间:毫秒,小于250毫秒,否则被重置为默认值30秒
spring.datasource.hikari.connection-timeout=60000
#最小空闲连接,默认值10,小于0或大于maximum-pool-size,都会重置为maximum-pool-size
spring.datasource.hikari.minimum-idle=10
#空闲连接超时时间,默认值600000(10分钟),大于等于max-lifetime且max-lifetime>0,会被重置为0;不等于0且小于10秒,会被重置为10秒。
# 只有空闲连接数大于最大连接数且空闲时间超过该值,才会被释放
spring.datasource.hikari.idle-timeout=500000
#连接最大存活时间.不等于0且小于30秒,会被重置为默认值30分钟.设置应该比mysql设置的超时时间短
spring.datasource.hikari.max-lifetime=540000
#连接测试查询
spring.datasource.hikari.connection-test-query=SELECT 1

6、接口测试

@RestController
@Slf4j
public class JobLauncherController {


    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job job;

    @RequestMapping("/launchjob")
    public String handle() throws Exception {
        String parameter = UUID.randomUUID().toString();
        try {
            //接口每次都重新生成一个UUID,如果参数完全相同,日志会提示任务已经执行成功,不能重复执行
            JobParameters jobParameters = new JobParametersBuilder().addString("message", "Welcome To Spring Batch World!" + parameter)
                    .toJobParameters();
            jobLauncher.run(job, jobParameters);
        } catch (Exception e) {
            log.error("", e);
        }

        return parameter;
    }
}

最后不要忘记在启动类上加上注解@EnableBatchProcessing

7、Job拦截器

@Component
public class InterceptingJobExecution implements JobExecutionListener {
    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println("Intercepting Job Execution - Before Job!");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println("Intercepting Job Execution - after Job!");
    }
}

实现JobExecutionListener接口后在Job配置的地方增加一下listener即可,如下:

@Autowired
    InterceptingJobExecution interceptingJobExecution;
@Bean
    public Job taskletJob() {
        return this.jobs.get("taskletJob")
                .start(step()).listener(interceptingJobExecution)
                .build();
    }

8、源码

https://github.com/cattles/fucking-great-springbatch

9、参考文档

SpringBatch系列入门之Tasklet

点赞
收藏
评论区
推荐文章
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Wesley13 Wesley13
3年前
JPA多数据源分布式事务处理
前言多数据源的事务处理是个老生常谈的话题,跨两个数据源的事务管理也算是分布式事务的范畴,在同一个JVM里处理多数据源的事务,比较经典的处理方案是JTA(基于XA协议建模的java标准事务抽象)XA(XA事务协议),常见的JTA实现框架有Atomikos、Bitronix、Narayana,Spring对这些框架都有组件封装,基本可以做到开箱即用程
Easter79 Easter79
3年前
Spring中AOP实现
1.什么是SpringAOP什么是aop:AspectOrientedProgramming的缩写,面向切面编程,通过预编译和动态代理实现程序功能的统一维护的一种技术主要功能:日志记录,性能统计,安全控制,事务处理,异常处理等2.SpringAOP框架的用途提供了声明
Stella981 Stella981
3年前
SpringBatch系列之Remote
1、概要前面的文章介绍了SpringBatch并发并行的批处理能力,但是还不够,单台机器的性能终归有极限,因此我们有些场景就可以考虑使用多台机器来处理。本文我们将介绍remotechunking,第一篇简单介绍SpringBatch多机器处理披露任务的能力。2、什么是remotechunking
Stella981 Stella981
3年前
SpringBatch系列之并发并行能力
1、概要大多数任务都能够通过简单的单进程单线程任务处理好,但是还有一大部分现实诉求无法满足。批量任务存在两种并行模式单进程、多线程多进程我们也可以细分为多线程Step(单进程)MultithreadStep并行Step(单进程)ParallelSteps对Step进行远程分块(
Stella981 Stella981
3年前
Laravel处理session(会话)的方法详解
在Web应用程序中,有必要识别跨越请求的用户并为每个用户保存数据,为此,像Laravel这样的框架提供了一种称为会话的机制。本篇文章就来为大家介绍关于Laravel处理session(会话)的方法。!laravel(https://oscimg.oschina.net/oscnet/f7951cdc35af1b61cc4dd6bd63973e2924
京东云开发者 京东云开发者
10个月前
Spring事务实现原理
1、引言spring的springtx模块提供了对事务管理支持,使用spring事务可以让我们从复杂的事务处理中得到解脱,无需要去处理获得连接、关闭连接、事务提交和回滚等这些操作。spring事务有编程式事务和声明式事务两种实现方式。编程式事务是通过编写代