Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。
业务方案:
1、批处理定期提交。
2、并行批处理:并行处理工作。
3、企业消息驱动处理
4、大规模的并行处理
5、手动或是有计划的重启
6、局部处理:跳过记录(如:回滚)
技术目标:
1、利用Spring编程模型:使程序员专注于业务处理,让Spring框架管理流程。
2、明确分离批处理的执行环境和应用。
3、提供核心的,共通的接口。
4、提供开箱即用(out of the box)的简单的默认的核心执行接口。
5、提供Spring框架中配置、自定义、和扩展服务。
6、所有存在的核心服务可以很容的被替换和扩展,不影响基础层。
7、提供一个简单的部署模式,利用Maven构建独立的Jar文件。
简单入门例子:
build.gradle
buildscript {
ext {
springBootGradlePluginVersion = '1.5.1.RELEASE'
}
repositories {
maven { url 'http://maven.aliyun.com/nexus/content/groups/public'}
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootGradlePluginVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
jar {
baseName = 'spring-batch'
version = '0.1.0'
}
repositories {
maven { url 'http://maven.aliyun.com/nexus/content/groups/public'}
mavenCentral()
}
sourceCompatibility = 1.8
targetCompatibility = 1.8
dependencies {
compile("org.springframework.boot:spring-boot-starter-batch")
compile("org.hsqldb:hsqldb")
testCompile("junit:junit")
}
sample-data.csv
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
---,---
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
---,---
John,Doe
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
BatchApplication.java
package xin.lowang.springbatch;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BatchApplication {
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
}
BatchJobConfig.java
package xin.lowang.springbatch.config;
import java.util.Collections;
import java.util.List;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import xin.lowang.springbatch.domain.Person;
import xin.lowang.springbatch.processor.PersonItemProcessor;
@Configuration
@EnableBatchProcessing
public class BatchJobConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
@Bean
public FlatFileItemReader<Person> reader() {
FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("sample-data.csv"));
reader.setLineMapper(new DefaultLineMapper<Person>() {
{
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
{
setTargetType(Person.class);
}
});
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(new String[] { "firstName", "lastName" });
}
});
}
});
return reader;
}
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Person> dbWriter() {
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
writer.setDataSource(dataSource);
return writer;
}
@Bean
public PrintItemWriter<Person> printWriter() {
return new PrintItemWriter<Person>() {
@Override
public void write(List<? extends Person> items) throws Exception {
items.stream().forEach(person -> {
System.out.println(person.getFirstName() + " is ok");
});
}
};
}
@Bean
public LogItemWriter<Person> logWriter() {
return new LogItemWriter<Person>() {
@Override
public void write(List<? extends Person> items) throws Exception {
items.stream().forEach(person -> {log.info("person {} is ok",person); });
}
};
}
//定义step和job
@Bean
public Step printStep() {
return stepBuilderFactory.get("printStep")
.<Person, Person>chunk(2)
.reader(reader())
.processor(processor())
.writer(printWriter())
.build();
}
@Bean
public Step logStep() {
return stepBuilderFactory
.get("logStep")
.<Person, Person>chunk(2)
.reader(new ListItemReader<>(Collections.singletonList(new Person("chenghao", "Wang"))))
.writer(logWriter())
.build();
}
@Bean
public Job importUserJob() {
return jobBuilderFactory
.get("importUserJob")
.incrementer(new RunIdIncrementer())
.flow(printStep())
.next(logStep())
.end()
.build();
}
public class PrintItemWriter<T> implements ItemWriter<T> {
@Override
public void write(List<? extends T> items) throws Exception {
}
}
public class LogItemWriter<T> implements ItemWriter<T> {
protected final Logger log = LoggerFactory.getLogger(LogItemWriter.class);
@Override
public void write(List<? extends T> items) throws Exception {
}
}
}
Person.java
package xin.lowang.springbatch.domain;
public class Person {
private String lastName;
private String firstName;
public Person() {
}
public Person(String lastName, String firstName) {
super();
this.lastName = lastName;
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
@Override
public String toString() {
return "firstName: " + firstName + ", lastName: " + lastName;
}
}
PersonItemProcessor.java
package xin.lowang.springbatch.processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import xin.lowang.springbatch.domain.Person;
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
@Override
public Person process(final Person item) throws Exception {
final String firstName = item.getFirstName();
final String lastName = item.getLastName();
final Person transformed = new Person(lastName.toUpperCase(), firstName.toUpperCase());
log.info("Transoform {} to {} ", item, transformed);
return transformed;
}
}