Spring Batch真是個優(yōu)秀的批處理框架,用完愛不釋手!
1 前言
Spring Batch是一個輕量級的、完善的批處理框架,作為Spring體系中的一員,它擁有靈活、方便、生產(chǎn)可用的特點(diǎn)。在應(yīng)對高效處理大量信息、定時處理大量數(shù)據(jù)等場景十分簡便。
結(jié)合調(diào)度框架能更大地發(fā)揮Spring Batch的作用。
2 Spring Batch的概念知識
2.1 分層架構(gòu)
Spring Batch的分層架構(gòu)圖如下:
通過例子講解Spring Batch入門,優(yōu)秀的批處理框架
可以看到它分為三層,分別是:
- Application應(yīng)用層:包含了所有任務(wù)batch jobs和開發(fā)人員自定義的代碼,主要是根據(jù)項(xiàng)目需要開發(fā)的業(yè)務(wù)流程等。
 - Batch Core核心層:包含啟動和管理任務(wù)的運(yùn)行環(huán)境類,如JobLauncher等。
 - Batch Infrastructure基礎(chǔ)層:上面兩層是建立在基礎(chǔ)層之上的,包含基礎(chǔ)的讀入reader和寫出writer、重試框架等。
 
2.2 關(guān)鍵概念
理解下圖所涉及的概念至關(guān)重要,不然很難進(jìn)行后續(xù)開發(fā)和問題分析。
通過例子講解Spring Batch入門,優(yōu)秀的批處理框架
2.2.1 JobRepository
專門負(fù)責(zé)與數(shù)據(jù)庫打交道,對整個批處理的新增、更新、執(zhí)行進(jìn)行記錄。所以Spring Batch是需要依賴數(shù)據(jù)庫來管理的。
2.2.2 任務(wù)啟動器JobLauncher
負(fù)責(zé)啟動任務(wù)Job。
2.2.3 任務(wù)Job
Job是封裝整個批處理過程的單位,跑一個批處理任務(wù),就是跑一個Job所定義的內(nèi)容。
通過例子講解Spring Batch入門,優(yōu)秀的批處理框架
上圖介紹了Job的一些相關(guān)概念:
- Job:封裝處理實(shí)體,定義過程邏輯。
 - JobInstance:Job的運(yùn)行實(shí)例,不同的實(shí)例,參數(shù)不同,所以定義好一個Job后可以通過不同參數(shù)運(yùn)行多次。
 - JobParameters:與JobInstance相關(guān)聯(lián)的參數(shù)。
 - JobExecution:代表Job的一次實(shí)際執(zhí)行,可能成功、可能失敗。
 
所以,開發(fā)人員要做的事情,就是定義Job。
2.2.4 步驟Step
Step是對Job某個過程的封裝,一個Job可以包含一個或多個Step,一步步的Step按特定邏輯執(zhí)行,才代表Job執(zhí)行完成。
通過例子講解Spring Batch入門,優(yōu)秀的批處理框架
通過定義Step來組裝Job可以更靈活地實(shí)現(xiàn)復(fù)雜的業(yè)務(wù)邏輯。
2.2.5 輸入——處理——輸出
所以,定義一個Job關(guān)鍵是定義好一個或多個Step,然后把它們組裝好即可。而定義Step有多種方法,但有一種常用的模型就是輸入——處理——輸出,即Item Reader、Item Processor和Item Writer。比如通過Item Reader從文件輸入數(shù)據(jù),然后通過Item Processor進(jìn)行業(yè)務(wù)處理和數(shù)據(jù)轉(zhuǎn)換,最后通過Item Writer寫到數(shù)據(jù)庫中去。
Spring Batch為我們提供了許多開箱即用的Reader和Writer,非常方便。
3 代碼實(shí)例
理解了基本概念后,就直接通過代碼來感受一下吧。整個項(xiàng)目的功能是從多個csv文件中讀數(shù)據(jù),處理后輸出到一個csv文件。
3.1 基本框架
添加依賴:
- <dependency>
 - <groupId>org.springframework.boot</groupId>
 - <artifactId>spring-boot-starter-batch</artifactId>
 - </dependency>
 - <dependency>
 - <groupId>com.h2database</groupId>
 - <artifactId>h2</artifactId>
 - <scope>runtime</scope>
 - </dependency>
 
需要添加Spring Batch的依賴,同時使用H2作為內(nèi)存數(shù)據(jù)庫比較方便,實(shí)際生產(chǎn)肯定是要使用外部的數(shù)據(jù)庫,如Oracle、PostgreSQL。
入口主類:
- @SpringBootApplication
 - @EnableBatchProcessing
 - public class PkslowBatchJobMain {
 - public static void main(String[] args) {
 - SpringApplication.run(PkslowBatchJobMain.class, args);
 - }
 - }
 
也很簡單,只是在Springboot的基礎(chǔ)上添加注解@EnableBatchProcessing。
領(lǐng)域?qū)嶓w類Employee:
- package com.pkslow.batch.entity;
 - public class Employee {
 - String id;
 - String firstName;
 - String lastName;
 - }
 
對應(yīng)的csv文件內(nèi)容如下:
- id,firstName,lastName
 - 1,Lokesh,Gupta
 - 2,Amit,Mishra
 - 3,Pankaj,Kumar
 - 4,David,Miller
 
3.2 輸入——處理——輸出
3.2.1 讀取ItemReader
因?yàn)橛卸鄠€輸入文件,所以定義如下:
- @Value("input/inputData*.csv")
 - private Resource[] inputResources;
 - @Bean
 - public MultiResourceItemReader<Employee> multiResourceItemReader()
 - {
 - MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>();
 - resourceItemReader.setResources(inputResources);
 - resourceItemReader.setDelegate(reader());
 - return resourceItemReader;
 - }
 - @Bean
 - public FlatFileItemReader<Employee> reader()
 - {
 - FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
 - //跳過csv文件第一行,為表頭
 - reader.setLinesToSkip(1);
 - reader.setLineMapper(new DefaultLineMapper() {
 - {
 - setLineTokenizer(new DelimitedLineTokenizer() {
 - {
 - //字段名
 - setNames(new String[] { "id", "firstName", "lastName" });
 - }
 - });
 - setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
 - {
 - //轉(zhuǎn)換化后的目標(biāo)類
 - setTargetType(Employee.class);
 - }
 - });
 - }
 - });
 - return reader;
 - }
 
這里使用了FlatFileItemReader,方便我們從文件讀取數(shù)據(jù)。
3.2.2 處理ItemProcessor
為了簡單演示,處理很簡單,就是把最后一列轉(zhuǎn)為大寫:
- public ItemProcessor<Employee, Employee> itemProcessor() {
 - return employee -> {
 - employee.setLastName(employee.getLastName().toUpperCase());
 - return employee;
 - };
 - }
 
3.2.3 輸出ItremWriter
比較簡單,代碼及注釋如下:
- private Resource outputResource = new FileSystemResource("output/outputData.csv");
 - @Bean
 - public FlatFileItemWriter<Employee> writer()
 - {
 - FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>();
 - writer.setResource(outputResource);
 - //是否為追加模式
 - writer.setAppendAllowed(true);
 - writer.setLineAggregator(new DelimitedLineAggregator<Employee>() {
 - {
 - //設(shè)置分割符
 - setDelimiter(",");
 - setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() {
 - {
 - //設(shè)置字段
 - setNames(new String[] { "id", "firstName", "lastName" });
 - }
 - });
 - }
 - });
 - return writer;
 - }
 
3.3 Step
有了Reader-Processor-Writer后,就可以定義Step了:
- @Bean
 - public Step csvStep() {
 - return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)
 - .reader(multiResourceItemReader())
 - .processor(itemProcessor())
 - .writer(writer())
 - .build();
 - }
 
這里有一個chunk的設(shè)置,值為5,意思是5條記錄后再提交輸出,可以根據(jù)自己需求定義。
3.4 Job
完成了Step的編碼,定義Job就容易了:
- @Bean
 - public Job pkslowCsvJob() {
 - return jobBuilderFactory
 - .get("pkslowCsvJob")
 - .incrementer(new RunIdIncrementer())
 - .start(csvStep())
 - .build();
 - }
 
3.5 運(yùn)行
完成以上編碼后,執(zhí)行程序,結(jié)果如下:
通過例子講解Spring Batch入門,優(yōu)秀的批處理框架
成功讀取數(shù)據(jù),并將最后字段轉(zhuǎn)為大寫,并輸出到outputData.csv文件。
4 監(jiān)聽Listener
可以通過Listener接口對特定事件進(jìn)行監(jiān)聽,以實(shí)現(xiàn)更多業(yè)務(wù)功能。比如如果處理失敗,就記錄一條失敗日志;處理完成,就通知下游拿數(shù)據(jù)等。
我們分別對Read、Process和Write事件進(jìn)行監(jiān)聽,對應(yīng)分別要實(shí)現(xiàn)ItemReadListener接口、ItemProcessListener接口和ItemWriteListener接口。因?yàn)榇a比較簡單,就是打印一下日志,這里只貼出ItemWriteListener的實(shí)現(xiàn)代碼:
- public class PkslowWriteListener implements ItemWriteListener<Employee> {
 - private static final Log logger = LogFactory.getLog(PkslowWriteListener.class);
 - @Override
 - public void beforeWrite(List<? extends Employee> list) {
 - logger.info("beforeWrite: " + list);
 - }
 - @Override
 - public void afterWrite(List<? extends Employee> list) {
 - logger.info("afterWrite: " + list);
 - }
 - @Override
 - public void onWriteError(Exception e, List<? extends Employee> list) {
 - logger.info("onWriteError: " + list);
 - }
 - }
 
把實(shí)現(xiàn)的監(jiān)聽器listener整合到Step中去:
- @Bean
 - public Step csvStep() {
 - return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)
 - .reader(multiResourceItemReader())
 - .listener(new PkslowReadListener())
 - .processor(itemProcessor())
 - .listener(new PkslowProcessListener())
 - .writer(writer())
 - .listener(new PkslowWriteListener())
 - .build();
 - }
 
執(zhí)行后看一下日志:
通過例子講解Spring Batch入門,優(yōu)秀的批處理框架
這里就能明顯看到之前設(shè)置的chunk的作用了。Writer每次是處理5條記錄,如果一條輸出一次,會對IO造成壓力。
5 總結(jié)
Spring Batch還有許多優(yōu)秀的特性,如面對大量數(shù)據(jù)時的并行處理。本文主要入門介紹為主,不一一介紹,后續(xù)會專門講解。





















 
 
 








 
 
 
 