自從用了 Spring Batch,效率飆升500%!
一、為什么需要批處理?
1. 應(yīng)用場(chǎng)景解析
場(chǎng)景1:銀行每日利息計(jì)算
圖片
- 痛點(diǎn): 凌晨時(shí)段需掃描百萬級(jí)賬戶數(shù)據(jù),手工計(jì)算容易遺漏
- Spring Batch方案: 分片讀取賬戶數(shù)據(jù),批量計(jì)算利息,失敗自動(dòng)重試
- 實(shí)際案例: 某銀行系統(tǒng)改造后,利息計(jì)算時(shí)間從4小時(shí)縮短至23分鐘
場(chǎng)景2:電商訂單歸檔
// 傳統(tǒng)SQL示例(存在性能問題)
DELETE FROM active_orders
WHERE create_time < '2023-01-01'
LIMIT 5000; // 需循環(huán)執(zhí)行直到無數(shù)據(jù)- 問題: 直接刪除百萬級(jí)數(shù)據(jù)會(huì)導(dǎo)致數(shù)據(jù)庫(kù)鎖表
- 正確做法: 使用Spring Batch分頁(yè)讀取→寫入歷史表→批量刪除
場(chǎng)景3:日志分析
圖片
- 典型需求: 分析Nginx日志中的API響應(yīng)時(shí)間分布
- 特殊挑戰(zhàn): 處理GB級(jí)文本文件時(shí)的內(nèi)存控制
場(chǎng)景4:醫(yī)療數(shù)據(jù)遷移
圖片
- 特殊要求: 遷移過程中老系統(tǒng)仍需正常使用
- 解決方案: 使用Spring Batch的增量遷移模式
2. 傳統(tǒng)方式痛點(diǎn)
圖片
詳細(xì)解釋每個(gè)痛點(diǎn):
- 資源管理復(fù)雜
// 典型的多線程錯(cuò)誤示例
ExecutorService executor = Executors.newFixedThreadPool(8);
try {
while(hasNextPage()) {
List<Data> page = fetchNextPage();
executor.submit(() -> processPage(page)); // 可能引發(fā)內(nèi)存泄漏
}
} finally {
executor.shutdown(); // 忘記調(diào)用會(huì)導(dǎo)致線程堆積
}常見問題:線程池配置不當(dāng)導(dǎo)致OOM、數(shù)據(jù)庫(kù)連接泄露
- 容錯(cuò)性黑洞
// 偽代碼:脆弱的錯(cuò)誤處理
for (int i=0; i<3; i++) {
try {
processBatch();
break;
} catch (Exception e) {
if (i == 2) sendAlert(); // 簡(jiǎn)單重試無法處理部分成功場(chǎng)景
}
}真實(shí)案例:某支付系統(tǒng)因未處理部分失敗,導(dǎo)致重復(fù)出款
- 維護(hù)噩夢(mèng)
# 典型硬編碼配置
batch.size=1000
input.path=/data/in
output.path=/data/out問題根源:參數(shù)修改需要重新部署、不同環(huán)境配置混雜
- 監(jiān)控盲區(qū)
# 開發(fā)人員常用的臨時(shí)方案
nohup java -jar batch.jar > log.txt 2>&1 &
tail -f log.txt # 無法獲知實(shí)時(shí)進(jìn)度關(guān)鍵缺陷:無法回答"處理到哪了?"、"還剩多少?"等業(yè)務(wù)問題
Spring Batch對(duì)比優(yōu)勢(shì)表
圖片
二、Spring Batch核心架構(gòu)
1. 四大金剛組件深度解析
組件1:Job(作業(yè)工廠)
圖片
- 核心作用: 定義完整的批處理流水線(如月度報(bào)表生成流程)
- 真實(shí)案例: 某銀行的日終對(duì)賬Job包含三個(gè)Step
@Bean
public Job reconciliationJob(){
return jobBuilderFactory.get("dailyReconciliation")
.start(downloadBankFileStep())
.next(validateDataStep())
.next(generateReportStep())
.build();
}組件2:Step(裝配流水線)
圖片
設(shè)計(jì)模式:采用分塊(Chunk)處理機(jī)制
配置示例:
@Bean
public Step importStep(){
return stepBuilderFactory.get("csvImport")
.<User, User>chunk(500) // 每500條提交一次
.reader(csvReader())
.processor(validationProcessor())
.writer(dbWriter())
.faultTolerant()
.skipLimit(10)
.skip(DataIntegrityViolationException.class)
.build();
}組件3:ItemReader(數(shù)據(jù)搬運(yùn)工)
圖片
- 典型實(shí)現(xiàn):
// 讀取CSV文件示例
@Bean
public FlatFileItemReader<User> csvReader(){
returnnew FlatFileItemReaderBuilder<User>()
.name("userReader")
.resource(new FileSystemResource("data/users.csv"))
.delimited().delimiter(",")
.names("id", "name", "email")
.fieldSetMapper(new BeanWrapperFieldSetMapper<User>() {{
setTargetType(User.class);
}})
.linesToSkip(1) // 跳過標(biāo)題行
.build();
}組件4:ItemWriter(數(shù)據(jù)收納師)
圖片
- 復(fù)合寫入示例:
@Bean
public CompositeItemWriter<User> compositeWriter(){
returnnew CompositeItemWriterBuilder<User>()
.delegates(dbWriter(), logWriter(), mqWriter())
.build();
}
// 數(shù)據(jù)庫(kù)寫入組件
private JdbcBatchItemWriter<User> dbWriter(){
returnnew JdbcBatchItemWriterBuilder<User>()
.dataSource(dataSource)
.sql("INSERT INTO users (name,email) VALUES (:name,:email)")
.beanMapped()
.build();
}2. 架構(gòu)示意圖
圖片
3. 隱藏BOSS:ItemProcessor(數(shù)據(jù)變形金剛)
圖片
- 典型應(yīng)用:數(shù)據(jù)脫敏處理
publicclassDataMaskProcessorimplementsItemProcessor<User, User> {
@Override
public User process(User user){
// 手機(jī)號(hào)脫敏
String phone = user.getPhone();
user.setPhone(phone.replaceAll("(\\d{3})\\d{4}(\\d{4})", "$1****$2"));
// 郵箱轉(zhuǎn)小寫
user.setEmail(user.getEmail().toLowerCase());
return user;
}
}4. 組件生命周期探秘
圖片
三、手把手開發(fā)指南
1. 環(huán)境搭建
<!-- 完整POM配置 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>
<dependencies>
<!-- Batch核心依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- 內(nèi)存數(shù)據(jù)庫(kù)(生產(chǎn)環(huán)境可更換為MySQL等) -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Lombok簡(jiǎn)化代碼 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies># application.properties
spring.batch.jdbc.initialize-schema=always # 自動(dòng)創(chuàng)建Batch元數(shù)據(jù)表
spring.datasource.url=jdbc:h2:mem:testdb
spring.datasource.driverClassName=org.h2.Driver2. 第一個(gè)批處理任務(wù)
- 領(lǐng)域模型類:
@Data// Lombok注解
@NoArgsConstructor
@AllArgsConstructor
publicclassUser{
private String name;
privateint age;
private String email;
}- 完整Job配置:
@Configuration
@EnableBatchProcessing
publicclassBatchConfig{
@Autowiredprivate JobBuilderFactory jobBuilderFactory;
@Autowiredprivate StepBuilderFactory stepBuilderFactory;
// 定義Job
@Bean
public Job importUserJob(){
return jobBuilderFactory.get("importUserJob")
.start(csvProcessingStep())
.build();
}
// 定義Step
@Bean
public Step csvProcessingStep(){
return stepBuilderFactory.get("csvProcessing")
.<User, User>chunk(100) // 每處理100條提交一次
.reader(userReader())
.processor(userProcessor())
.writer(userWriter())
.build();
}
// CSV文件讀取器
@Bean
public FlatFileItemReader<User> userReader(){
returnnew FlatFileItemReaderBuilder<User>()
.name("userReader")
.resource(new ClassPathResource("users.csv")) // 文件路徑
.delimited()
.delimiter(",")
.names("name", "age", "email") // 字段映射
.targetType(User.class)
.linesToSkip(1) // 跳過標(biāo)題行
.build();
}
// 數(shù)據(jù)處理(示例:年齡校驗(yàn))
@Bean
public ItemProcessor<User, User> userProcessor(){
return user -> {
if (user.getAge() < 0) {
thrownew IllegalArgumentException("年齡不能為負(fù)數(shù): " + user);
}
return user.toBuilder() // 使用Builder模式創(chuàng)建新對(duì)象
.email(user.getEmail().toLowerCase())
.build();
};
}
// 數(shù)據(jù)庫(kù)寫入器
@Bean
public JdbcBatchItemWriter<User> userWriter(DataSource dataSource){
returnnew JdbcBatchItemWriterBuilder<User>()
.dataSource(dataSource)
.sql("INSERT INTO users (name, age, email) VALUES (:name, :age, :email)")
.beanMapped()
.build();
}
}- CSV文件示例(
src/main/resources/users.csv):
name,age,email
張三,25,zhangsan@example.com
李四,30,lisi@example.com
王五,-5,wangwu@example.com- 啟動(dòng)類:
@SpringBootApplication
publicclassBatchApplicationimplementsCommandLineRunner{
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importUserJob;
publicstaticvoidmain(String[] args){
SpringApplication.run(BatchApplication.class, args);
}
@Override
publicvoidrun(String... args)throws Exception {
JobParameters params = new JobParametersBuilder()
.addLong("startAt", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(importUserJob, params);
}
}3. 執(zhí)行流程可視化
圖片
4. 運(yùn)行效果驗(yàn)證
- 控制臺(tái)輸出:
2023-10-01 10:00:00 INFO o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=importUserJob]] launched
2023-10-01 10:00:05 INFO o.s.batch.core.job.SimpleStepHandler - Executing step: [csvProcessing]
2023-10-01 10:00:15 ERROR o.s.batch.core.step.AbstractStep - Encountered an error executing step csvProcessing
org.springframework.batch.item.validator.ValidationException: 年齡不能為負(fù)數(shù): User(name=王五, age=-5, email=wangwu@example.com)- 數(shù)據(jù)庫(kù)結(jié)果:
SELECT * FROMusers;
圖片
5. 調(diào)試技巧
- 查看元數(shù)據(jù):
SELECT * FROM BATCH_JOB_INSTANCE;
SELECT * FROM BATCH_STEP_EXECUTION;- 重試失敗任務(wù):
// 在Job配置中添加容錯(cuò)機(jī)制
@Bean
public Step csvProcessingStep(){
return stepBuilderFactory.get("csvProcessing")
.<User, User>chunk(100)
.reader(userReader())
.processor(userProcessor())
.writer(userWriter())
.faultTolerant()
.skipLimit(3) // 最多跳過3條錯(cuò)誤
.skip(IllegalArgumentException.class)
.build();
}- 日志監(jiān)控配置:
logging.level.org.springframework.batch=DEBUG
logging.level.org.hibernate.SQL=WARN四、實(shí)戰(zhàn)案例:銀行交易對(duì)賬
1. 場(chǎng)景需求增強(qiáng)說明
核心流程:
圖片
技術(shù)挑戰(zhàn):
- 雙數(shù)據(jù)源讀?。ㄎ募?數(shù)據(jù)庫(kù))
- 千萬級(jí)數(shù)據(jù)高效比對(duì)
- 差異記錄快速入庫(kù)
- 分布式環(huán)境運(yùn)行
2. 完整架構(gòu)設(shè)計(jì)
圖片
3. 領(lǐng)域模型定義
@Data
@AllArgsConstructor
@NoArgsConstructor
publicclassTransaction{
// 公共字段
private String transactionId;
private LocalDateTime tradeTime;
private BigDecimal amount;
// 銀行端數(shù)據(jù)
private String bankSerialNo;
private BigDecimal bankAmount;
// 內(nèi)部系統(tǒng)數(shù)據(jù)
private String internalOrderNo;
private BigDecimal systemAmount;
// 對(duì)賬結(jié)果
private ReconStatus status;
private String discrepancyType;
}
publicenum ReconStatus {
MATCHED, // 數(shù)據(jù)一致
AMOUNT_DIFF, // 金額不一致
STATUS_DIFF, // 狀態(tài)不一致
ONLY_IN_BANK, // 銀行單邊賬
ONLY_IN_SYSTEM // 系統(tǒng)單邊賬
}4. 完整Job配置
@Configuration
@EnableBatchProcessing
publicclassBankReconJobConfig{
// 主Job定義
@Bean
public Job bankReconciliationJob(Step downloadStep, Step reconStep, Step reportStep){
return jobBuilderFactory.get("bankReconciliationJob")
.start(downloadStep)
.next(reconStep)
.next(reportStep)
.build();
}
// 文件下載Step
@Bean
public Step downloadStep(){
return stepBuilderFactory.get("downloadStep")
.tasklet((contribution, chunkContext) -> {
// 實(shí)現(xiàn)SFTP下載邏輯
sftpService.download("/bank/recon/20231001.csv");
return RepeatStatus.FINISHED;
})
.build();
}
// 核心對(duì)賬Step
@Bean
public Step reconStep(){
return stepBuilderFactory.get("reconStep")
.<Transaction, Transaction>chunk(1000)
.reader(compositeReader())
.processor(compositeProcessor())
.writer(compositeWriter())
.faultTolerant()
.skipLimit(100)
.skip(DataIntegrityViolationException.class)
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
// 組合數(shù)據(jù)讀取器
@Bean
public CompositeItemReader<Transaction> compositeReader(){
returnnew CompositeItemReaderBuilder<Transaction>()
.delegates(bankFileReader(), internalDbReader())
.build();
}
// 銀行文件讀取器
@Bean
public FlatFileItemReader<Transaction> bankFileReader(){
returnnew FlatFileItemReaderBuilder<Transaction>()
.name("bankFileReader")
.resource(new FileSystemResource("recon/20231001.csv"))
.delimited()
.names("transactionId","tradeTime","amount","bankSerialNo")
.fieldSetMapper(fieldSet -> {
Transaction t = new Transaction();
t.setTransactionId(fieldSet.readString("transactionId"));
t.setBankSerialNo(fieldSet.readString("bankSerialNo"));
t.setBankAmount(fieldSet.readBigDecimal("amount"));
return t;
})
.build();
}
// 內(nèi)部數(shù)據(jù)庫(kù)讀取器
@Bean
public JdbcCursorItemReader<Transaction> internalDbReader(){
returnnew JdbcCursorItemReaderBuilder<Transaction>()
.name("internalDbReader")
.dataSource(internalDataSource)
.sql("SELECT order_no, amount, status FROM transactions WHERE trade_date = ?")
.rowMapper((rs, rowNum) -> {
Transaction t = new Transaction();
t.setInternalOrderNo(rs.getString("order_no"));
t.setSystemAmount(rs.getBigDecimal("amount"));
return t;
})
.preparedStatementSetter(ps -> ps.setString(1, "2023-10-01"))
.build();
}
// 組合處理器
@Bean
public CompositeItemProcessor<Transaction> compositeProcessor(){
List<ItemProcessor<?, ?>> delegates = new ArrayList<>();
delegates.add(new DataMatchingProcessor());
delegates.add(new DiscrepancyClassifier());
returnnew CompositeItemProcessorBuilder<>()
.delegates(delegates)
.build();
}
// 組合寫入器
@Bean
public CompositeItemWriter<Transaction> compositeWriter(){
returnnew CompositeItemWriterBuilder<Transaction>()
.delegates(
discrepancyDbWriter(),
alertMessageWriter()
)
.build();
}
}5. 核心處理器實(shí)現(xiàn)
publicclassDataMatchingProcessorimplementsItemProcessor<Transaction, Transaction> {
@Override
public Transaction process(Transaction item){
// 雙數(shù)據(jù)源匹配邏輯
if (item.getBankSerialNo() == null) {
item.setStatus(ReconStatus.ONLY_IN_SYSTEM);
} elseif (item.getInternalOrderNo() == null) {
item.setStatus(ReconStatus.ONLY_IN_BANK);
} else {
compareAmounts(item);
compareStatuses(item);
}
return item;
}
privatevoidcompareAmounts(Transaction t){
if (t.getBankAmount().compareTo(t.getSystemAmount()) != 0) {
t.setDiscrepancyType("AMOUNT_MISMATCH");
t.setStatus(ReconStatus.AMOUNT_DIFF);
BigDecimal diff = t.getBankAmount().subtract(t.getSystemAmount());
t.setAmount(diff.abs());
}
}
privatevoidcompareStatuses(Transaction t){
// 假設(shè)從數(shù)據(jù)庫(kù)獲取內(nèi)部狀態(tài)
String internalStatus = transactionService.getStatus(t.getInternalOrderNo());
if(!"SETTLED".equals(internalStatus)){
t.setDiscrepancyType("STATUS_MISMATCH");
t.setStatus(ReconStatus.STATUS_DIFF);
}
}
}
publicclassDiscrepancyClassifierimplementsItemProcessor<Transaction, Transaction> {
@Override
public Transaction process(Transaction item){
if (item.getStatus() != ReconStatus.MATCHED) {
// 添加告警標(biāo)記
item.setAlertLevel(calculateAlertLevel(item));
}
return item;
}
private AlertLevel calculateAlertLevel(Transaction t){
if (t.getAmount().compareTo(new BigDecimal("1000000")) > 0) {
return AlertLevel.CRITICAL;
}
return AlertLevel.WARNING;
}
}6. 差異報(bào)告生成Step
@Bean
public Step reportStep(){
return stepBuilderFactory.get("reportStep")
.<Transaction, Transaction>chunk(1000)
.reader(discrepancyReader())
.writer(excelWriter())
.build();
}
@Bean
public JdbcPagingItemReader<Transaction> discrepancyReader(){
returnnew JdbcPagingItemReaderBuilder<Transaction>()
.name("discrepancyReader")
.dataSource(reconDataSource)
.selectClause("SELECT *")
.fromClause("FROM discrepancy_records")
.whereClause("WHERE recon_date = '2023-10-01'")
.sortKeys(Collections.singletonMap("transaction_id", Order.ASCENDING))
.rowMapper(new BeanPropertyRowMapper<>(Transaction.class))
.build();
}
@Bean
public ExcelFileItemWriter<Transaction> excelWriter(){
returnnew ExcelFileItemWriterBuilder<Transaction>()
.name("excelWriter")
.resource(new FileSystemResource("reports/2023-10-01.xlsx"))
.sheetName("差異報(bào)告")
.headers(new String[]{"交易ID", "差異類型", "金額差異", "告警級(jí)別"})
.fieldExtractor(item -> new Object[]{
item.getTransactionId(),
item.getDiscrepancyType(),
item.getAmount(),
item.getAlertLevel()
})
.build();
}7. 性能優(yōu)化配置
# 應(yīng)用配置
spring.batch.job.enabled=false# 禁止自動(dòng)啟動(dòng)
spring.batch.initialize-schema=never # 生產(chǎn)環(huán)境禁止自動(dòng)建表
# 性能調(diào)優(yōu)參數(shù)
spring.batch.chunk.size=2000 # 根據(jù)內(nèi)存調(diào)整
spring.datasource.hikari.maximum-pool-size=20
spring.jpa.properties.hibernate.jdbc.batch_size=10008. 執(zhí)行監(jiān)控看板
圖片
五、生產(chǎn)級(jí)特性
1. 容錯(cuò)機(jī)制
圖片
- 完整容錯(cuò)配置示例:
@Bean
public Step secureStep(){
return stepBuilderFactory.get("secureStep")
.<Input, Output>chunk(500)
.reader(jdbcReader())
.processor(secureProcessor())
.writer(restApiWriter())
.faultTolerant()
.retryLimit(3)
.retry(ConnectException.class) // 網(wǎng)絡(luò)問題重試
.retry(DeadlockLoserDataAccessException.class) // 數(shù)據(jù)庫(kù)死鎖重試
.skipLimit(100)
.skip(DataIntegrityViolationException.class) // 數(shù)據(jù)問題跳過
.skip(InvalidDataAccessApiUsageException.class)
.noRollback(ValidationException.class) // 驗(yàn)證異常不回滾
.listener(newErrorLogListener()) // 自定義監(jiān)聽器
.build();
}
// 錯(cuò)誤日志監(jiān)聽器示例
publicclassErrorLogListenerimplementsItemProcessListener<Input, Output> {
@Override
publicvoidonProcessError(Input item, Exception e){
ErrorLog log = new ErrorLog();
log.setItemData(item.toString());
log.setErrorMsg(e.getMessage());
errorLogRepository.save(log);
}
}2. 性能優(yōu)化策略(千萬級(jí)數(shù)據(jù)處理)
策略1:并行Step執(zhí)行
圖片
配置代碼:
@Bean
public Job parallelJob(){
return jobBuilderFactory.get("parallelJob")
.start(step1())
.split(new SimpleAsyncTaskExecutor()) // 啟用異步執(zhí)行器
.add(step2(), step3())
.build();
}策略2:分區(qū)處理(Partitioning)
圖片
- 分區(qū)處理器實(shí)現(xiàn):
@Bean
public Step masterStep(){
return stepBuilderFactory.get("masterStep")
.partitioner("slaveStep", partitioner())
.gridSize(10) // 分區(qū)數(shù)量=CPU核心數(shù)*2
.taskExecutor(new ThreadPoolTaskExecutor())
.build();
}
@Bean
public Partitioner partitioner(){
returnnew Partitioner() {
@Override
public Map<String, ExecutionContext> partition(int gridSize){
Map<String, ExecutionContext> result = new HashMap<>();
long total = getTotalRecordCount();
long range = total / gridSize;
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putLong("min", i * range);
context.putLong("max", (i+1) * range);
result.put("partition"+i, context);
}
return result;
}
};
}
// Slave Step配置
@Bean
public Step slaveStep(){
return stepBuilderFactory.get("slaveStep")
.<Record, Result>chunk(1000)
.reader(rangeReader(null, null))
.processor(processor())
.writer(writer())
.build();
}
@StepScope
@Bean
public ItemReader<Record> rangeReader(
@Value("#{stepExecutionContext[min]}") Long min,
@Value("#{stepExecutionContext[max]}") Long max) {
returnnew JdbcCursorItemReaderBuilder<Record>()
.sql("SELECT * FROM records WHERE id BETWEEN ? AND ?")
.preparedStatementSetter(ps -> {
ps.setLong(1, min);
ps.setLong(2, max);
})
// 其他配置...
.build();
}- 策略3:異步ItemProcessor
圖片
- 異步處理配置:
@Bean
public Step asyncStep(){
return stepBuilderFactory.get("asyncStep")
.<Input, Output>chunk(1000)
.reader(reader())
.processor(asyncItemProcessor())
.writer(writer())
.build();
}
@Bean
public AsyncItemProcessor<Input, Output> asyncItemProcessor(){
AsyncItemProcessor<Input, Output> asyncProcessor = new AsyncItemProcessor<>();
asyncProcessor.setDelegate(syncProcessor()); // 同步處理器
asyncProcessor.setTaskExecutor(new ThreadPoolTaskExecutor());
return asyncProcessor;
}
@Bean
public AsyncItemWriter<Output> asyncItemWriter(){
AsyncItemWriter<Output> asyncWriter = new AsyncItemWriter<>();
asyncWriter.setDelegate(syncWriter()); // 同步寫入器
return asyncWriter;
}3. 性能對(duì)比測(cè)試數(shù)據(jù)
圖片
優(yōu)化技巧:
- 數(shù)據(jù)庫(kù)連接池調(diào)優(yōu):
spring.datasource.hikari.maximum-pool-size=20
spring.datasource.hikari.minimum-idle=5- JVM參數(shù)優(yōu)化:
java -jar -Xmx4g -XX:+UseG1GC -XX:MaxGCPauseMillis=200 ...- 批處理參數(shù)調(diào)整:
.chunk(2000) // 根據(jù)內(nèi)存容量調(diào)整
.setQueryTimeout(60) // 數(shù)據(jù)庫(kù)查詢超時(shí)六、監(jiān)控與管理(生產(chǎn)級(jí)方案)
1. 監(jiān)控方案升級(jí)(Spring Batch Admin替代方案)
圖片
- 現(xiàn)代監(jiān)控棧配置:
// 添加監(jiān)控依賴
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>// 暴露監(jiān)控端點(diǎn)
@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags(){
return registry -> registry.config().commonTags("application", "batch-service");
}
// 自定義Batch指標(biāo)
publicclassBatchMetricsListenerextendsJobExecutionListenerSupport{
privatefinal Counter processedRecords = Counter.builder("batch.records.processed")
.description("Total processed records")
.register(Metrics.globalRegistry);
@Override
publicvoidafterStep(StepExecution stepExecution){
processedRecords.increment(stepExecution.getWriteCount());
}
}2. 元數(shù)據(jù)表結(jié)構(gòu)詳解
圖片
關(guān)鍵表用途:
BATCH_JOB_INSTANCE:作業(yè)指紋庫(kù)(相同參數(shù)只能存在一個(gè)實(shí)例)BATCH_JOB_EXECUTION_PARAMS:存儲(chǔ)每次運(yùn)行的參數(shù)BATCH_STEP_EXECUTION_CONTEXT:保存步驟上下文數(shù)據(jù)(重啟恢復(fù)的關(guān)鍵)
3. 自定義監(jiān)控看板
-- 常用監(jiān)控SQL示例
-- 最近5次作業(yè)執(zhí)行情況
SELECT j.JOB_NAME, e.START_TIME, e.END_TIME,
TIMEDIFF(e.END_TIME, e.START_TIME) ASDURATION,
s.READ_COUNT, s.WRITE_COUNT
FROM BATCH_JOB_EXECUTION e
JOIN BATCH_JOB_INSTANCE j ON e.JOB_INSTANCE_ID = j.JOB_INSTANCE_ID
JOIN BATCH_STEP_EXECUTION s ON e.JOB_EXECUTION_ID = s.JOB_EXECUTION_ID
ORDERBY e.START_TIME DESCLIMIT5;七、常見問題Q&A(終極指南)
1. 內(nèi)存溢出問題深度解決方案
場(chǎng)景:處理10GB CSV文件時(shí)OOM
圖片
- 優(yōu)化代碼示例:
@Bean
@StepScope
public FlatFileItemReader<LargeRecord> largeFileReader(
@Value("#{jobParameters['filePath']}") String filePath) {
returnnew FlatFileItemReaderBuilder<LargeRecord>()
.resource(new FileSystemResource(filePath))
.lineMapper(new DefaultLineMapper<>() {{
setLineTokenizer(new DelimitedLineTokenizer());
setFieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(LargeRecord.class);
}});
}})
.linesToSkip(1)
.strict(false) // 允許文件結(jié)尾空行
.saveState(false) // 禁用狀態(tài)保存
.build();
}
// JVM參數(shù)建議
// -XX:+UseG1GC -Xmx2g -XX:MaxGCPauseMillis=2002. 定時(shí)任務(wù)高級(jí)配置
- 多任務(wù)調(diào)度方案:
@Configuration
@EnableScheduling
publicclassScheduleConfig{
@Autowiredprivate JobLauncher jobLauncher;
@Autowiredprivate Job reportJob;
// 工作日凌晨執(zhí)行
@Scheduled(cron = "0 0 2 * * MON-FRI")
publicvoiddailyJob()throws Exception {
JobParameters params = new JobParametersBuilder()
.addString("date", LocalDate.now().toString())
.toJobParameters();
jobLauncher.run(reportJob, params);
}
// 每小時(shí)輪詢
@Scheduled(fixedRate = 3600000)
publicvoidpollJob(){
if(checkNewDataExists()) {
jobLauncher.run(dataProcessJob, new JobParameters());
}
}
// 優(yōu)雅停止示例
publicvoidstopJob(Long executionId){
JobExecution execution = jobExplorer.getJobExecution(executionId);
if(execution.isRunning()) {
execution.setStatus(BatchStatus.STOPPING);
jobRepository.update(execution);
}
}
}3. 高頻問題集錦
Q:如何重新運(yùn)行失敗的任務(wù)?
-- 步驟1:查詢失敗的任務(wù)ID
SELECT * FROM BATCH_JOB_EXECUTION WHERESTATUS = 'FAILED';
-- 步驟2:使用相同參數(shù)重新啟動(dòng)
JobParameters params = new JobParametersBuilder()
.addLong("restartId", originalExecutionId)
.toJobParameters();
jobLauncher.run(job, params);Q:處理過程中斷電怎么辦?
圖片
Q:如何實(shí)現(xiàn)動(dòng)態(tài)參數(shù)傳遞?
// 命令行啟動(dòng)方式
java -jar batch.jar --spring.batch.job.name=dataImportJob date=2023-10-01
// 編程式參數(shù)構(gòu)建
publicvoidrunJobWithParams(Map<String, Object> params){
JobParameters jobParams = new JobParametersBuilder()
.addString("mode", "forceUpdate")
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(importJob, jobParams);
}4. 性能調(diào)優(yōu)檢查清單
數(shù)據(jù)庫(kù)優(yōu)化
- 添加批量處理索引
- 配置連接池參數(shù)
- 啟用JDBC批處理模式
JVM優(yōu)化
-XX:+UseStringDeduplication
-XX:+UseCompressedOops
-XX:MaxMetaspaceSize=512mBatch配置
spring.batch.jdbc.initialize-schema=never
spring.batch.job.enabled=false
spring.jpa.open-in-view=false
































