实战代码(二):Springboot Batch实现定时数据迁移

一、理论基础

1.1 Batch是什么

Spring Batch是Spring全家桶中的一员,是一个轻量级的批处理框架,比较实际的应用场景是数据迁移,比如将csv文件中的数据迁移到MySQL。

优势在于上手简单,编码规范化,能以较少的代码实现强大的功能。和ETL工具-kettle功能类似,但是定制性比较强

应用场景集中在各种DB、文件等各种已经存在的历史数据,貌似不支持消息队列的实时监听(如果有知道如何实现的,一定要告诉我),实时数据监听可以使用Storm等流式数据处理框架

1.2 基础概念

  • ItemReader:读取数据,有多个封装好的类,可以支持多种数据源,如csv、jdbc等,也可以自定义功能实现。
  • ItemWriter:输出数据,有Reader配套的封装类,同样可以自定义功能实现,如输出到消息队列。
  • ItemProcessor:数据处理模块,输入为Reader读取的数据,输出为Writer的输入。
  • Step:数据操作的步骤,包括:ItemReader->ItemProcessor->ItemWriter 整个数据流
  • Job:待执行的任务,每个job可以有一个或多个step
  • JobRepository:注册job的容器
  • JobLauncher:启动job
  • JobLocator:可以根据jobName获取到指定的job,可以配合JobRepository、JobLauncher来手动启动job

1.3 如何开发一个Batch并启动

  • 确认输入输出,分别定义InputEntity和OutputEntity
  • 编写Reader,输入为各种数据源(csv、MySQL等),输出为InputEntity,数据库的可以选择封装好的类: JdbcCursorItemReader<T>
  • 编写Processor,输入为InputEntity,输出为OutputEntity,继承ItemProcessor<T, T>,实现process方法即可
  • 编写Writer,输入为OutputEntity,输出为指定的数据源(MySQL等)
  • 配置Step和Job

抛却必要配置,实现一个迁移任务就是这么简单

二、实战代码

2.0 创建测试表

数据源表

CREATE TABLE `article` (
  `title` varchar(64) DEFAULT NULL COMMENT '标题',
  `content` varchar(255) DEFAULT NULL COMMENT '内容',
  `event_occurred_time` varchar(32) DEFAULT NULL COMMENT '事件发生时间'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='文章';

输出的数据表

CREATE TABLE `article_detail` (
  `title` varchar(64) DEFAULT NULL COMMENT '标题',
  `content` varchar(255) DEFAULT NULL COMMENT '内容',
  `event_occurred_time` varchar(32) DEFAULT NULL COMMENT '事件发生时间',
  `source` varchar(255) DEFAULT NULL COMMENT '文章来源',
  `description` varchar(255) DEFAULT NULL COMMENT '描述信息'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='文章详情';

2.1 依赖引入

# 本实例基于Springboot 2.X版本
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

2.2 配置文件

spring:
  batch:
    job:
      # 默认为true,程序启动时Job会自动执行;false,需要手动启动任务(jobLaucher.run)
      enabled: false
    # spring batch默认情况下需要在数据库中创建元数据表,always:每次都会检查表存不存在,不存在会自动创建;never:不会自动创建,如果表不存在,则会报错;
    initialize-schema: never 

如需手动创建元数据表,请参考最后面的附录

2.3 配置JobRepository

@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry){
    JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
    jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
    return jobRegistryBeanPostProcessor;
}

如果没有该项配置,则手动启动时会报错No job configuration with the name [XJob] was registered

2.4 可选配置

2.4.1 内存模式

/**
* - NoPersistence 无持久化
*/
@Component
public class NoPersistenceBatchConfigurer extends DefaultBatchConfigurer {
    @Override
    public void setDataSource(DataSource dataSource) {
    }
}

加了此项配置后,不会在数据库中创建元数据表,所有的job都是在内存中管理。程序重启后,任务信息会丢失,复杂的任务场景不建议加此配置,对于不需要严格任务管理的任务来讲比较合适。

2.4.2 任务监听

@Component
@Slf4j
public class JobListener extends JobExecutionListenerSupport {

    @Override
    public void afterJob(JobExecution jobExecution) {
        if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("任务[{}]执行成功,参数:[{}]", jobExecution.getJobInstance().getJobName(),
                    jobExecution.getJobParameters().getString("executedTime"));
        } else {
            log.info("任务[{}]执行失败", jobExecution.getJobInstance().getJobName());
            // TODO something
        }
    }
}

如果不需要在任务成功或者失败后做一些操作的话可以不加监听器,因为Batch自身包含日志执行情况日志(info级别),包括执行结果、执行参数、执行耗费时间等

2.5 定义输入、输出实体

Article:输入

@Data
public class Article {

    private String title;

    private String content;

    private String eventOccurredTime;
}

ArticleDetail:待输出的数据结构

@Data
public class ArticleDetail {

    private String title;

    private String content;

    private String eventOccurredTime;

    private String source;

    private String description;
}

2.6 Reader

2.6.1 JdbcCursorItemReader

/**
 * 普通读取模式
 * - MySQL会将所有的纪录读到内存中
 * - 数据量大的话内存占用会很高
 */
public JdbcCursorItemReader<Article> getArticle(String executedTime) {
    String lastExecutedTime = "2020-01-01 00:00:00";
    String sql = StringUtils.join("SELECT * FROM article WHERE event_occurred_time >= '",
            lastExecutedTime, "' AND event_occurred_time < '", executedTime, "'");
    return new JdbcCursorItemReaderBuilder<Article>()
            .dataSource(dataSource)
            .sql(sql)
            .fetchSize(10)
            .name("getArticle")
            .beanRowMapper(Article.class)
            .build();
}

2.6.2 分页读取

/**
     * 分页读取模式
     * - 只要分页合理配置,内存占用可控
     */
    public JdbcPagingItemReader<Article> getArticlePaging(String executedTime) {
        String lastExecutedTime = "";
        Map<String, Object> parameterValues = new HashMap<>(2);
        parameterValues.put("startTime", lastExecutedTime);
        parameterValues.put("stopTime", executedTime);
        return new JdbcPagingItemReaderBuilder<Article>()
                .dataSource(dataSource)
                .name("getArticlePaging")
                .fetchSize(10)
                .parameterValues(parameterValues)
                .pageSize(10)
                .rowMapper(new ArticleMapper())
                .queryProvider(articleProvider())
                .build();
    }

    private PagingQueryProvider articleProvider() {
        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("event_occurred_time", Order.ASCENDING);
        MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();
        provider.setSelectClause("title, content, event_occurred_time");
        provider.setFromClause("article");
        provider.setWhereClause("event_occurred_time >= :startTime AND event_occurred_time < :stopTime");
        provider.setSortKeys(sortKeys);
        return provider;
    }

2.6.3 说明

  • 可以继承ItemReader<T>,实现自定义功能的Reader
  • 分页虽然对于资源的使用时可控的,但是效率会低很多,需要合理设置每一页的数据量。
    • 如果有很多个任务一起执行,是看总数据量,比如有五个任务,每个任务采集的数据量为10W,那么设置分页的时候,要考虑到50W的数据量的内存占用情况
  • JdbcCursorItemReader在内存足够的情况下可以使用,效率很高

2.7 Processor

2.7.1 示例代码

@Component
public class ArticleProcessor implements ItemProcessor<Article, ArticleDetail> {

    @Override
    public ArticleDetail process(Article data) throws Exception {
        ArticleDetail articleDetail = new ArticleDetail();
        BeanUtils.copyProperties(data, articleDetail);
        articleDetail.setSource("weibo");
        articleDetail.setDescription("这是一条来源于微博的新闻");
        return articleDetail;
    }
}

2.7.2 说明

  • processor只需要继承ItemProcessor<T1, T2>实现其中的process方法即可。
    • T1是Reader读取的数据实体
    • T2是要输出到Writer的数据实体,也就是Writer的输入数据实体

2.8 Writer

2.8.1 JdbcBatchItemWriter

@Component
public class ArticleJdbcWriter {

    private final DataSource dataSource;

    public ArticleJdbcWriter(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    public JdbcBatchItemWriter<ArticleDetail> writer() {
        return new JdbcBatchItemWriterBuilder<ArticleDetail>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO article_detail (title, content, event_occurred_time, source, description) VALUES (:title, :content, :eventOccurredTime, :source, :description)")
                .dataSource(dataSource)
                .build();
    }
}

2.8.2 自定义writer

@Slf4j
public class ArticleWriter implements ItemWriter<ArticleDetail> {

    @Override
    public void write(List<? extends ArticleDetail> list) throws Exception {
        log.info("list的大小等于job中设置的chunkSize, size = {}", list.size());
        // TODO 此处可输出数据,比如输出到消息队列
        list.forEach(article -> log.info("输出测试,title:{}", article.getTitle()));
    }
}

2.8.3 说明

  • 继承ItemWriter<T>,实现writer方法即可
  • T是Processor的输出
  • list是Step中设置的chunkSize,也就是每次提交到writer的数据量

2.9 Step与Job

2.9.1 示例代码

@Configuration
@EnableBatchProcessing
public class ArticleBatchJob {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    @Autowired
    private ArticleReaderDemo articleReader;
    @Autowired
    private ArticleProcessor articleProcessor;
    @Autowired
    private ArticleJdbcWriter articleJdbcWriter;

    @Bean(name = "articleReader")
    @StepScope
    public JdbcPagingItemReader<Article> batchReader(@Value("#{jobParameters['executedTime']}") String executedTime) {
        return articleReader.getArticlePaging(executedTime);
    }

    @Bean(name = "articleWriter")
    public ItemWriter<ArticleDetail> batchWriter() {
//      return articleJdbcWriter.writer();
        return new ArticleWriter();
    }

    @Bean(name = "articleJob")
    public Job batchJob(JobListener listener, Step articleStep) {
        return jobBuilderFactory.get("articleJob")
                .listener(listener)
                .incrementer(new RunIdIncrementer())
                .flow(articleStep)
                .end()
                .build();
    }

    @Bean(name = "articleStep")
    public Step step(JdbcPagingItemReader<Article> articleReader, ItemWriter<ArticleDetail> articleWriter) {
        return stepBuilderFactory.get("crossHistoryStep")
                // 数据会累积到一定量再提交到writer
                .<Article, ArticleDetail>chunk(10)
                .reader(articleReader)
                .processor(articleProcessor)
                .writer(articleWriter)
                // 默认为false(如果参数未发生变化的话,任务不会重复执行)
                .allowStartIfComplete(true)
                .build();
    }
}

2.9.1 说明

  • @EnableBatchProcessing是必须的
  • 每个Step中,并不是每处理一条数据都提交到Writer的,需要配置chunkSize,合理的chunkSize对于数据采集效率的提升效果很明显
  • Job如果执行成功一次,下次任务启动时如果参数没有变化的话,默认情况下是不会重复执行的,如果想要执行可以传一个时间参数或者设置allowStartIfComplete(true)

2.10 集成Quartz实现定时启动

Springboot如何集成Quartz可以看 《实战代码(一):SpringBoot集成Quartz》

2.10.1 QuartzJob

@Component
@Slf4j
@DisallowConcurrentExecution
public class ArticleQuartzJob extends QuartzJobBean {

    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private JobLocator jobLocator;

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        try {
            Job job = jobLocator.getJob("articleJob");
            jobLauncher.run(job, new JobParametersBuilder()
                    .addString("executedTime", "2020-11-10 16:21:01")
                    .toJobParameters());
        } catch (Exception e) {
            e.printStackTrace();
            log.error("任务[articleJob]启动失败,错误信息:{}", e.getMessage());
        }
    }
}

2.10.2 初始化QuartzJob

@Component
public class QuartzJobInit implements CommandLineRunner {

    @Autowired
    private QuartzUtils quartzUtils;

    @Override
    public void run(String... args) throws Exception {
        quartzUtils.addSingleJob(ArticleQuartzJob.class, "articleJob", 60);
    }
}

源码地址

https://github.com/lysmile/spring-boot-demo/tree/master/spring-boot-batch-demo

附录 元数据表建表语句(MYSQL)

创建元数据表的SQL文件在org.springframework.batch.core包中可以找到,可以针对不同的数据库进行配置

-- Autogenerated: do not edit this file


CREATE TABLE BATCH_JOB_INSTANCE  (
   JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,
   VERSION BIGINT ,
   JOB_NAME VARCHAR(100) NOT NULL,
   JOB_KEY VARCHAR(32) NOT NULL,
   constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;


CREATE TABLE BATCH_JOB_EXECUTION  (
   JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
   VERSION BIGINT  ,
   JOB_INSTANCE_ID BIGINT NOT NULL,
   CREATE_TIME DATETIME NOT NULL,
   START_TIME DATETIME DEFAULT NULL ,
   END_TIME DATETIME DEFAULT NULL ,
   STATUS VARCHAR(10) ,
   EXIT_CODE VARCHAR(2500) ,
   EXIT_MESSAGE VARCHAR(2500) ,
   LAST_UPDATED DATETIME,
   JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
   constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
   references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;


CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
   JOB_EXECUTION_ID BIGINT NOT NULL ,
   TYPE_CD VARCHAR(6) NOT NULL ,
   KEY_NAME VARCHAR(100) NOT NULL ,
   STRING_VAL VARCHAR(250) ,
   DATE_VAL DATETIME DEFAULT NULL ,
   LONG_VAL BIGINT ,
   DOUBLE_VAL DOUBLE PRECISION ,
   IDENTIFYING CHAR(1) NOT NULL ,
   constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
   references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;


CREATE TABLE BATCH_STEP_EXECUTION  (
   STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
   VERSION BIGINT NOT NULL,
   STEP_NAME VARCHAR(100) NOT NULL,
   JOB_EXECUTION_ID BIGINT NOT NULL,
   START_TIME DATETIME NOT NULL ,
   END_TIME DATETIME DEFAULT NULL ,
   STATUS VARCHAR(10) ,
   COMMIT_COUNT BIGINT ,
   READ_COUNT BIGINT ,
   FILTER_COUNT BIGINT ,
   WRITE_COUNT BIGINT ,
   READ_SKIP_COUNT BIGINT ,
   WRITE_SKIP_COUNT BIGINT ,
   PROCESS_SKIP_COUNT BIGINT ,
   ROLLBACK_COUNT BIGINT ,
   EXIT_CODE VARCHAR(2500) ,
   EXIT_MESSAGE VARCHAR(2500) ,
   LAST_UPDATED DATETIME,
   constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
   references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;


CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (
   STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
   SHORT_CONTEXT VARCHAR(2500) NOT NULL,
   SERIALIZED_CONTEXT TEXT ,
   constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
   references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;


CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
   JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
   SHORT_CONTEXT VARCHAR(2500) NOT NULL,
   SERIALIZED_CONTEXT TEXT ,
   constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
   references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;


CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
   ID BIGINT NOT NULL,
   UNIQUE_KEY CHAR(1) NOT NULL,
   constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;


INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);


CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
   ID BIGINT NOT NULL,
   UNIQUE_KEY CHAR(1) NOT NULL,
   constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;


INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);


CREATE TABLE BATCH_JOB_SEQ (
   ID BIGINT NOT NULL,
   UNIQUE_KEY CHAR(1) NOT NULL,
   constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;


INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);

参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容