批处理任务的主要业务逻辑都是在Step中去完成的。可以将Job理解为运行Step的框架,而Step理解为业务功能
1.Step配置
Step是Job中的工作单元,每一个Step涵盖了单行记录的处理闭环。下图是一个Step的简要结构:
一个Step通常涵盖三个部分:读数据(Reader)、处理数据(Processor)和写数据(Writer)。但是并不是所有的Step都需要自身来完成数据的处理,比如存储过程等方式是通过外部功能来完成,因此Spring Batch提供了2种Step的处理方式:
TaskletStep is used when either only reading or writing the data item is required.(只读或者只写)
ChunkStep is used when both reading and writing the data item is required.(可以读和写的操作)
TaskletStep 常用在非数据库层的业务处理,ChunkStep通常用于数据库的批处理
2.面向分片的处理过程
一个Step通常涵盖三个部分:读数据(Reader)、处理数据(Processor)和写数据(Writer)。但是并不是所有的Step都需要自身来完成数据的处理,比如存储过程等方式是通过外部功能来完成,因此Spring Batch提供了2种Step的处理方式:
在Spring Batch中所谓的事物和数据事物的概念一样,就是一次性提交多少数据。如果在聚合数据期间出现任何错误,所有的这些数据都将不执行写入。
3.面向对象配置Step
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
return this.jobBuilderFactory.get("sampleJob")
.repository(jobRepository)
.start(sampleStep)
.build();
}
@Bean
public Step sampleStep(PlatformTransactionManager transactionManager) {
return this.stepBuilderFactory.get("sampleStep")
.transactionManager(transactionManager)
.<String, String>chunk(10) //分片大小
.reader(itemReader()) //reader配置
.writer(itemWriter()) //write配置
.build();
}
观察sampleStep方法:
reader: 使用ItemReader提供读数据的方法。
write:ItemWrite提供写数据的方法。
transactionManager:使用默认的 PlatformTransactionManager 对事物进行管理。当配置好事物之后
Spring Batch会自动对事物进行管理,无需开发人员显示操作。
chunk:指定一次性数据提交的记录数,因为任务是基于Step分次处理的,当累计到chunk配置的次数则进
行一次提交。提交的内容除了业务数据,还有批处理任务运行相关的元数据。
是否使用ItemProcessor是一个可选项。如果没有Processor可以将数据视为读取并直接写入。
3.1分片大小
Step
使用PlatformTransactionManager
管理事物。每次事物提交的间隔根据chunk
方法中配置的数据执行。如果设置为1,那么在每一条数据处理完之后都会调用ItemWrite
进行提交。提交间隔设置太小,那么会浪费需要多不必要的资源,提交间隔设置的太长,会导致事物链太长占用空间,并且出现失败会导致大量数据回滚。因此设定一个合理的间隔是非常必要的,这需要根据实际业务情况、性能要求、以及数据安全程度来设定。如果没有明确的评估目标,设置为10~20较为合适
3.2配置step重启
前文介绍了Job的重启,但是每次重启对Step也是有很大的影响的,因此需要特定的配置。
3.2.1配置重启的次数
某些Step可能用于处理一些先决的任务,所以当Job再次重启时这Step就没必要再执行,可以通过设置startLimit来限定某个Step重启的次数。当设置为1时候表示仅仅运行一次,而出现重启时将不再执行:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.startLimit(1)//重启次数 这里只运行一次
.build();
}
3.2.2配置已经完成任务的Step
在单个JobInstance的上下文中,如果某个Step已经处理完毕(COMPLETED)那么在默认情况下重启之后这个Step并不会再执行。可以通过设置allow-start-if-complete为true告知框架每次重启该Step都要执行:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.allowStartIfComplete(true)
.build();
}
4.配置略过逻辑
某些时候在任务处理单个记录时中出现失败并不应该停止任务,而应该跳过继续处理下一条数据。是否跳过需要根据业务来判定,因此框架提供了跳过机制交给开发人员使用。如何配置跳过机制:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)//出现异常次数的限制
.skip(Exception.class)//抛出指定异常时略过
.build();
}
码的含义是当处理过程中抛出Exception异常时就跳过该条记录的处理。skip-limit(skipLimit方法)配置的参数表示当跳过的次数超过数值时则会导致整个Step失败,从而停止继续运行。还可以通过反向配置的方式来忽略某些异常:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(Exception.class)
.noSkip(FileNotFoundException.class)
.build();
}
skip表示要当捕捉到Exception异常就跳过。但是Exception有很多继承类,此时可以使用noSkip方法指定某些异常不能跳过。
5.设置重试逻辑
当处理记录出个异常之后并不希望他立即跳过或者停止运行,而是希望可以多次尝试执行直到失败:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
retry(DeadlockLoserDataAccessException.class)表示只有捕捉到该异常才会重试,retryLimit(3)表示最多重试3次,faultTolerant()表示启用对应的容错功能。
6.事务回滚控制
默认情况下,无论是设置了重试(retry)还是跳过(skip),只要从Writer抛出一个异常都会导致事物回滚。如果配置了skip机制,那么在Reader中抛出的异常不会导致回滚。有些从Writer抛出一个异常并不需要回滚数据,noRollback属性为Step提供了不必进行事物回滚的异常配置:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.noRollback(ValidationException.class) //不必回滚的异常
.build();
}
7.事务读取数据的缓存
一次Setp分为Reader、Processor和Writer三个阶段,这些阶段统称为Item。默认情况下如果错误不是发生在Reader阶段,那么没必要再去重新读取一次数据。但是某些场景下需要Reader部分也需要重新执行,比如Reader是从一个JMS队列中消费消息,当发生回滚的时候消息也会在队列上重放,因此也要将Reader纳入到回滚的事物中,根据这个场景可以使用readerIsTransactionalQueue来配置数据重读:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.readerIsTransactionalQueue() //数据重读
.build();
}
8.事务属性
事物的属性包括隔离等级(isolation)、传播方式(propagation)以及过期时间(timeout)。关于事物的控制详见Spring Data Access的说明,下面是相关配置的方法:
@Bean
public Step step1() {
//配置事物属性
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
attribute.setIsolationLevel(Isolation.DEFAULT.value());
attribute.setTimeout(30);
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.transactionAttribute(attribute) //设置事物属性
.build();
}
9.向Step注册ItemStream
ItemStream是用于每一个阶段(Reader、Processor、Writer)的“生命周期回调数据处理器”,后续的文章会详细介绍ItemStream。在4.×版本之后默认注入注册了通用的ItemStream。
有2种方式将ItemStream注册到Step中,一是使用stream方法:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(compositeItemWriter())
.stream(fileItemWriter1())
.stream(fileItemWriter2())
.build();
}
二是使用相关方法的代理:
@Bean
public CompositeItemWriter compositeItemWriter() {
List<ItemWriter> writers = new ArrayList<>(2);
writers.add(fileItemWriter1());
writers.add(fileItemWriter2());
CompositeItemWriter itemWriter = new CompositeItemWriter();
itemWriter.setDelegates(writers);
return itemWriter;
}
Spring Batch提供了多个接口以满足不同事件的监听。
10.1.StepExecutor拦截器
在Step执行的过程中会产生各种各样的事件,开发人员可以利用各种Listener接口对Step及Item进行监听。通常在创建一个Step的时候添加拦截器:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(reader())
.writer(writer())
.listener(chunkListener()) //添加拦截器
.build();
}
10.2.StepExecutionListener
StepExecutionListener可以看做一个通用的Step拦截器,他的作用是在Step开始之前和结束之后进行拦截处理:
void beforeStep(StepExecution stepExecution); //Step执行之前
ExitStatus afterStep(StepExecution stepExecution); //Step执行完毕之后
}
在结束的时候开发人员可以自己定义返回的ExitStatus,用于配合流程控制(见后文)实现对整个Step执行过程的控制。
10.3.ChunkListener
ChunkListener是在数据事物发生的两端被触发。chunk的配置决定了处理多少项记录才进行一次事物提交,ChunkListener的作用就是对一次事物开始之后或事物提交之后进行拦截:
public interface ChunkListener extends StepListener {
void beforeChunk(ChunkContext context); //事物开始之后,ItemReader调用之前
void afterChunk(ChunkContext context); //事物提交之后
void afterChunkError(ChunkContext context); //事物回滚之后
}
10.4.ItemReaderListener
该接口用于对Reader相关的事件进行监控:
public interface ItemReadListener<T> extends StepListener {
void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);
}
beforeRead在每次Reader调用之前被调用,afterRead在每次Reader成功返回之后被调用,而onReadError会在出现异常之后被调用,可以将其用于记录异常日志。
10.5.ItemProcessListener
ItemProcessListener和ItemReadListener类似,是围绕着ItemProcessor进行处理的:
public interface ItemProcessListener<T, S> extends StepListener {
void beforeProcess(T item); //processor执行之前
void afterProcess(T item, S result); //processor直线成功之后
void onProcessError(T item, Exception e); //processor执行出现异常
}
10.6.ItemWriteListener
ItemWriteListener的功能和ItemReadListener、ItemReadListener类似,但是需要注意的是它接收和处理的数据对象是一个List。List的长度与chunk配置相关。
public interface ItemWriteListener<S> extends StepListener {
void beforeWrite(List<? extends S> items);
void afterWrite(List<? extends S> items);
void onWriteError(Exception exception, List<? extends S> items);
}
10.7.SkipListener
ItemReadListener、ItemProcessListener和ItemWriteListener都提供了错误拦截处理的机制,但是没有处理跳过(skip)的数据记录。因此框架提供了SkipListener来专门处理那么被跳过的记录:
void onSkipInRead(Throwable t); //Read期间导致跳过的异常
void onSkipInProcess(T item, Throwable t); //Process期间导致跳过的异常
void onSkipInWrite(S item, Throwable t); //Write期间导致跳过的异常
}
SkipListener的价值是可以将那些未能成功处理的记录在某个位置保存下来,然后交给其他批处理进一步解决,或者人工来处理。Spring Batch保证以下2个特征:
1.跳过的元素只会出现一次。
2.SkipListener始终在事物提交之前被调用,这样可以保证监听器使用的事物资源不会被业务事物影响。
11.TaskletStep
面向分片(Chunk-oriented processing )的过程并不是Step的唯一执行方式。比如用数据库的存储过程来处理数据,这个时候使用标准的Reader、Processor、Writer会很奇怪,针对这些情况框架提供了TaskletStep。
TaskletStep是一个非常简单的接口,仅有一个方法——execute。TaskletStep会反复的调用这个方法直到获取一个RepeatStatus.FINISHED返回或者抛出一个异常。所有的Tasklet调用都会包装在一个事物中。
注册一个TaskletStep非常简单,只要添加一个实现了Tasklet接口的类即可:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.tasklet(myTasklet()) //注入Tasklet的实现
.build();
}
12.1.TaskletAdapter
与ItemReader和itemriter接口的其他适配器一样,Tasklet接口包含一个实现,它允许自己适应任何预先存在的类:TaskletAdapter。这可能有用的一个示例是用于更新一组记录上的标志的现有DAO。可以使用TaskletAdapter调用此类,而不必为Tasklet接口编写适配器。
以下示例演示如何在Java中定义TaskletAdapter:
@Bean
public MethodInvokingTaskletAdapter myTasklet() {
MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();
adapter.setTargetObject(fooDao());
adapter.setTargetMethod("updateFoo");
return adapter;
}
12.2.实现Tasklet
任何批处理作业都包含必须在主处理开始之前完成的步骤,以便设置各种资源,或者在处理完成之后清理这些资源。对于处理大量文件的作业,通常需要在将某些文件成功上载到其他位置后在本地删除这些文件。下面的示例(取自Spring批处理示例项目)是一个具有这样一个职责的Tasklet实现:
public class FileDeletingTasklet implements Tasklet, InitializingBean {
private Resource directory;
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
File dir = directory.getFile();
Assert.state(dir.isDirectory());
File[] files = dir.listFiles();
for (int i = 0; i < files.length; i++) {
boolean deleted = files[i].delete();
if (!deleted) {
throw new UnexpectedJobExecutionException("Could not delete file " +
files[i].getPath());
}
}
return RepeatStatus.FINISHED;
}
public void setDirectoryResource(Resource directory) {
this.directory = directory;
}
public void afterPropertiesSet() throws Exception {
Assert.notNull(directory, "directory must be set");
}
}
之前的tasklet实现会删除给定目录中的所有文件。应该注意,execute方法只被调用一次。剩下的就是引用步骤中的tasklet。
以下示例演示如何在Java中引用步骤中的tasklet:
@Bean
public Job taskletJob() {
return this.jobBuilderFactory.get("taskletJob")
.start(deleteFilesInDir())
.build();
}
@Bean
public Step deleteFilesInDir() {
return this.stepBuilderFactory.get("deleteFilesInDir")
.tasklet(fileDeletingTasklet())
.build();
}
@Bean
public FileDeletingTasklet fileDeletingTasklet() {
FileDeletingTasklet tasklet = new FileDeletingTasklet();
tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir"));
return tasklet;
}
13.控制Step执行流程
13.1.执行顺序
默认情况下。Step与Step之间是顺序执行的,如下图:
顺序执行通过next方法来标记:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(stepA())
.next(stepB()) //顺序执行
.next(stepC())
.build();
}
13.2.条件执行
在顺序执行的过程中,在整个执行链条中有一个Step执行失败则整个Job就会停止。但是通过条件执行,可以指定各种情况下的执行分支:
为了实现更加复杂的控制,可以通过Step执行后的退出命名来定义条件分之。先看一个简单的代码:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(stepA()) //启动时执行的step
.on("*").to(stepB()) //默认跳转到stepB
.from(stepA()).on("FAILED").to(stepC()) //当返回的ExitStatus为"FAILED"时,执行。
.end()
.build();
}
这里使用来表示默认处理,是一个通配符表示处理任意字符串,对应的还可以使用?表示匹配任意字符。在Spring Batch(1)——数据批处理概念一文中介绍了Step的退出都会有ExitStatus
,命名都来源于它。下面是一个更加全面的代码。
1.配置拦截器处理ExitCode:
public class SkipCheckingListener extends StepExecutionListenerSupport {
public ExitStatus afterStep(StepExecution stepExecution) {
String exitCode = stepExecution.getExitStatus().getExitCode();
if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
stepExecution.getSkipCount() > 0) { //当Skip的Item大于0时,则指定ExitStatus的内容
return new ExitStatus("COMPLETED WITH SKIPS");
}
else {
return null;
}
}
}
拦截器指示当有一个以上被跳过的记录时,返回的ExitStatus为"COMPLETED WITH SKIPS"。对应的控制流程:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1()).on("FAILED").end() //执行失败直接退出
.from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1()) //有跳过元素执行 errorPrint1()
.from(step1()).on("*").to(step2()) //默认(成功)情况下执行 Step2
.end()
.build();
}
13.3.Step的停机退出机制
Spring Batch为Job
提供了三种退出机制,这些机制为批处理的执行提供了丰富的控制方法。在介绍退出机制之前需要回顾一下 数据批处理概念一文中关于StepExecution
的内容。在StepExecution
中有2个表示状态的值,一个名为status
,另外一个名为exitStatus
。前者也被称为BatchStatus
。
前面以及介绍了ExitStatus
的使用,他可以控制Step执行链条的条件执行过程。除此之外BatchStatus
也会参与到过程的控制。
End退出
默认情况下(没有使用end、fail方法结束),Job要顺序执行直到退出,这个退出称为end。这个时候,BatchStatus=COMPLETED、ExitStatus=COMPLETED,表示成功执行。
除了Step链式处理自然退出,也可以显示调用end来退出系统。看下面的例子:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1()) //启动
.next(step2()) //顺序执行
.on("FAILED").end()
.from(step2()).on("*").to(step3()) //条件执行
.end()
.build();
}
上面的代码,step1到step2是顺序执行,当step2的exitStatus返回"FAILED"时则直接End退出。其他情况执行Step3。
Fail退出
除了end还可以使用fail退出,这个时候,BatchStatus=FAILED、ExitStatus=EARLY TERMINATION,表示执行失败。这个状态与End最大的区别是Job会尝试重启执行新的JobExecution。看下面代码的例子:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1()) //执行step1
.next(step2()).on("FAILED").fail() //step2的ExitStatus=FAILED 执行fail
.from(step2()).on("*").to(step3()) //否则执行step3
.end()
.build();
}
在指定的节点中断
Spring Batch还支持在指定的节点退出,退出后下次重启会从中断的点继续执行。中断的作用是某些批处理到某个步骤后需要人工干预,当干预完之后又接着处理:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
//如果step1的ExitStatus=COMPLETED则在step2中断
.start(step1()).on("COMPLETED").stopAndRestart(step2())
//否则直接退出批处理
.end()
.build();
}
13.4.程序化流程的分支
可以直接进行编码来控制Step之间的扭转,Spring Batch提供了JobExecutionDecider接口来协助分支管理:
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
String status;
if (someCondition()) {
status = "FAILED";
}
else {
status = "COMPLETED";
}
return new FlowExecutionStatus(status);
}
}
接着将MyDecider作为过滤器添加到配置过程中:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1())
.next(decider()).on("FAILED").to(step2())
.from(decider()).on("COMPLETED").to(step3())
.end()
.build();
}
13.5流程分裂
线性处理过程中,流程都是一个接着一个执行的。但是为了满足某些特殊的需要,Spring Batch提供了执行的过程分裂并行Step的方法。参看下面的Job配置:
@Bean
public Job job() {
Flow flow1 = new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();//并行流程1
Flow flow2 = new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();//并行流程2
return this.jobBuilderFactory.get("job")
.start(flow1)
.split(new SimpleAsyncTaskExecutor()) //创建一个异步执行任务
.add(flow2)
.next(step4()) //2个分支执行完毕之后再执行step4。
.end()
.build();
}
这里表示flow1和flow2会并行执行,待2者执行成功后执行step4。
13.6.数据绑定
在Job或Step的任何位置,都可以获取到统一配置的数据。比如使用标准的Spring Framework方式:
@Bean
public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
当我们通过配置文件(application.properties中 input.file.name=filepath)或者jvm参数(-Dinput.file.name=filepath)指定某些数据时,都可以通过这种方式获取到对应的配置参数。
此外,也可以从JobParameters从获取到Job运行的上下文参数:
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
无论是JobExecution还是StepExecution,其中的内容都可以通过这种方式去获取参数,例如:
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
或者
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
Step Scope
Job Scope的概念和 Step Scope类似,都是用于标识在到了某个执行时间段再添加和注入Bean。@JobScope用于告知框架知道JobInstance存在时候才初始化对应的@Bean:
Job Scope
Job Scope的概念和 Step Scope类似,都是用于标识在到了某个执行时间段再添加和注入Bean。@JobScope用于告知框架知道JobInstance存在时候才初始化对应的@Bean:
@JobScope
@Bean
// 初始化获取 jobParameters中的参数
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@JobScope
@Bean
// 初始化获取jobExecutionContext中的参数
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext中的参数['input.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}