偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

自從用了 Spring Batch,效率飆升500%!

開發(fā) 架構(gòu)
場(chǎng)景1:銀行每日利息計(jì)算。痛點(diǎn):?凌晨時(shí)段需掃描百萬級(jí)賬戶數(shù)據(jù),手工計(jì)算容易遺漏。Spring Batch方案:?分片讀取賬戶數(shù)據(jù),批量計(jì)算利息,失敗自動(dòng)重試。實(shí)際案例:?某銀行系統(tǒng)改造后,利息計(jì)算時(shí)間從4小時(shí)縮短至23分鐘。

一、為什么需要批處理?

1. 應(yīng)用場(chǎng)景解析

場(chǎng)景1:銀行每日利息計(jì)算

圖片圖片

  • 痛點(diǎn): 凌晨時(shí)段需掃描百萬級(jí)賬戶數(shù)據(jù),手工計(jì)算容易遺漏
  • Spring Batch方案: 分片讀取賬戶數(shù)據(jù),批量計(jì)算利息,失敗自動(dòng)重試
  • 實(shí)際案例: 某銀行系統(tǒng)改造后,利息計(jì)算時(shí)間從4小時(shí)縮短至23分鐘
場(chǎng)景2:電商訂單歸檔
// 傳統(tǒng)SQL示例(存在性能問題)
DELETE FROM active_orders 
WHERE create_time < '2023-01-01'
LIMIT 5000; // 需循環(huán)執(zhí)行直到無數(shù)據(jù)
  • 問題: 直接刪除百萬級(jí)數(shù)據(jù)會(huì)導(dǎo)致數(shù)據(jù)庫(kù)鎖表
  • 正確做法: 使用Spring Batch分頁(yè)讀取→寫入歷史表→批量刪除
場(chǎng)景3:日志分析

圖片圖片

  • 典型需求: 分析Nginx日志中的API響應(yīng)時(shí)間分布
  • 特殊挑戰(zhàn): 處理GB級(jí)文本文件時(shí)的內(nèi)存控制
場(chǎng)景4:醫(yī)療數(shù)據(jù)遷移

圖片圖片

  • 特殊要求: 遷移過程中老系統(tǒng)仍需正常使用
  • 解決方案: 使用Spring Batch的增量遷移模式

2. 傳統(tǒng)方式痛點(diǎn)

圖片圖片

詳細(xì)解釋每個(gè)痛點(diǎn):

  • 資源管理復(fù)雜
// 典型的多線程錯(cuò)誤示例
ExecutorService executor = Executors.newFixedThreadPool(8);
try {
    while(hasNextPage()) {
        List<Data> page = fetchNextPage();
        executor.submit(() -> processPage(page)); // 可能引發(fā)內(nèi)存泄漏
    }
} finally {
    executor.shutdown(); // 忘記調(diào)用會(huì)導(dǎo)致線程堆積
}

常見問題:線程池配置不當(dāng)導(dǎo)致OOM、數(shù)據(jù)庫(kù)連接泄露

  • 容錯(cuò)性黑洞
// 偽代碼:脆弱的錯(cuò)誤處理
for (int i=0; i<3; i++) {
    try {
        processBatch();
        break;
    } catch (Exception e) {
        if (i == 2) sendAlert(); // 簡(jiǎn)單重試無法處理部分成功場(chǎng)景
    }
}

真實(shí)案例:某支付系統(tǒng)因未處理部分失敗,導(dǎo)致重復(fù)出款

  • 維護(hù)噩夢(mèng)
# 典型硬編碼配置
batch.size=1000
input.path=/data/in
output.path=/data/out

問題根源:參數(shù)修改需要重新部署、不同環(huán)境配置混雜

  • 監(jiān)控盲區(qū)
# 開發(fā)人員常用的臨時(shí)方案
nohup java -jar batch.jar > log.txt 2>&1 &
tail -f log.txt # 無法獲知實(shí)時(shí)進(jìn)度

關(guān)鍵缺陷:無法回答"處理到哪了?"、"還剩多少?"等業(yè)務(wù)問題

Spring Batch對(duì)比優(yōu)勢(shì)表

圖片圖片

二、Spring Batch核心架構(gòu)

1. 四大金剛組件深度解析

組件1:Job(作業(yè)工廠)

圖片圖片

  • 核心作用: 定義完整的批處理流水線(如月度報(bào)表生成流程)
  • 真實(shí)案例: 某銀行的日終對(duì)賬Job包含三個(gè)Step
@Bean
public Job reconciliationJob(){
    return jobBuilderFactory.get("dailyReconciliation")
            .start(downloadBankFileStep())
            .next(validateDataStep())
            .next(generateReportStep())
            .build();
}
組件2:Step(裝配流水線)

圖片圖片

設(shè)計(jì)模式:采用分塊(Chunk)處理機(jī)制

配置示例:

@Bean
public Step importStep(){
    return stepBuilderFactory.get("csvImport")
            .<User, User>chunk(500)  // 每500條提交一次
            .reader(csvReader())
            .processor(validationProcessor())
            .writer(dbWriter())
            .faultTolerant()
            .skipLimit(10)
            .skip(DataIntegrityViolationException.class)
            .build();
}
組件3:ItemReader(數(shù)據(jù)搬運(yùn)工)

圖片圖片

  • 典型實(shí)現(xiàn):
// 讀取CSV文件示例
@Bean
public FlatFileItemReader<User> csvReader(){
    returnnew FlatFileItemReaderBuilder<User>()
            .name("userReader")
            .resource(new FileSystemResource("data/users.csv"))
            .delimited().delimiter(",")
            .names("id", "name", "email")
            .fieldSetMapper(new BeanWrapperFieldSetMapper<User>() {{
                setTargetType(User.class);
            }})
            .linesToSkip(1) // 跳過標(biāo)題行
            .build();
}
組件4:ItemWriter(數(shù)據(jù)收納師)

圖片圖片

  • 復(fù)合寫入示例:
@Bean
public CompositeItemWriter<User> compositeWriter(){
    returnnew CompositeItemWriterBuilder<User>()
            .delegates(dbWriter(), logWriter(), mqWriter())
            .build();
}

// 數(shù)據(jù)庫(kù)寫入組件
private JdbcBatchItemWriter<User> dbWriter(){
    returnnew JdbcBatchItemWriterBuilder<User>()
            .dataSource(dataSource)
            .sql("INSERT INTO users (name,email) VALUES (:name,:email)")
            .beanMapped()
            .build();
}

2. 架構(gòu)示意圖

圖片圖片

3. 隱藏BOSS:ItemProcessor(數(shù)據(jù)變形金剛)

圖片圖片

  • 典型應(yīng)用:數(shù)據(jù)脫敏處理
publicclassDataMaskProcessorimplementsItemProcessor<User, User> {
    @Override
    public User process(User user){
        // 手機(jī)號(hào)脫敏
        String phone = user.getPhone();
        user.setPhone(phone.replaceAll("(\\d{3})\\d{4}(\\d{4})", "$1****$2"));
        
        // 郵箱轉(zhuǎn)小寫
        user.setEmail(user.getEmail().toLowerCase());
        
        return user;
    }
}

4. 組件生命周期探秘

圖片圖片

三、手把手開發(fā)指南

1. 環(huán)境搭建

<!-- 完整POM配置 -->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.5</version>
</parent>

<dependencies>
    <!-- Batch核心依賴 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    
    <!-- 內(nèi)存數(shù)據(jù)庫(kù)(生產(chǎn)環(huán)境可更換為MySQL等) -->
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <scope>runtime</scope>
    </dependency>
    
    <!-- Lombok簡(jiǎn)化代碼 -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>
# application.properties
spring.batch.jdbc.initialize-schema=always # 自動(dòng)創(chuàng)建Batch元數(shù)據(jù)表
spring.datasource.url=jdbc:h2:mem:testdb
spring.datasource.driverClassName=org.h2.Driver

2. 第一個(gè)批處理任務(wù)

  • 領(lǐng)域模型類:
@Data// Lombok注解
@NoArgsConstructor
@AllArgsConstructor
publicclassUser{
    private String name;
    privateint age;
    private String email;
}
  • 完整Job配置:
@Configuration
@EnableBatchProcessing
publicclassBatchConfig{

    @Autowiredprivate JobBuilderFactory jobBuilderFactory;
    @Autowiredprivate StepBuilderFactory stepBuilderFactory;

    // 定義Job
    @Bean
    public Job importUserJob(){
        return jobBuilderFactory.get("importUserJob")
                .start(csvProcessingStep())
                .build();
    }

    // 定義Step
    @Bean
    public Step csvProcessingStep(){
        return stepBuilderFactory.get("csvProcessing")
                .<User, User>chunk(100) // 每處理100條提交一次
                .reader(userReader())
                .processor(userProcessor())
                .writer(userWriter())
                .build();
    }

    // CSV文件讀取器
    @Bean
    public FlatFileItemReader<User> userReader(){
        returnnew FlatFileItemReaderBuilder<User>()
                .name("userReader")
                .resource(new ClassPathResource("users.csv")) // 文件路徑
                .delimited()
                .delimiter(",")
                .names("name", "age", "email") // 字段映射
                .targetType(User.class)
                .linesToSkip(1) // 跳過標(biāo)題行
                .build();
    }

    // 數(shù)據(jù)處理(示例:年齡校驗(yàn))
    @Bean
    public ItemProcessor<User, User> userProcessor(){
        return user -> {
            if (user.getAge() < 0) {
                thrownew IllegalArgumentException("年齡不能為負(fù)數(shù): " + user);
            }
            return user.toBuilder() // 使用Builder模式創(chuàng)建新對(duì)象
                    .email(user.getEmail().toLowerCase())
                    .build();
        };
    }

    // 數(shù)據(jù)庫(kù)寫入器
    @Bean
    public JdbcBatchItemWriter<User> userWriter(DataSource dataSource){
        returnnew JdbcBatchItemWriterBuilder<User>()
                .dataSource(dataSource)
                .sql("INSERT INTO users (name, age, email) VALUES (:name, :age, :email)")
                .beanMapped()
                .build();
    }
}
  • CSV文件示例(src/main/resources/users.csv):
name,age,email
張三,25,zhangsan@example.com
李四,30,lisi@example.com
王五,-5,wangwu@example.com
  • 啟動(dòng)類:
@SpringBootApplication
publicclassBatchApplicationimplementsCommandLineRunner{

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job importUserJob;

    publicstaticvoidmain(String[] args){
        SpringApplication.run(BatchApplication.class, args);
    }

    @Override
    publicvoidrun(String... args)throws Exception {
        JobParameters params = new JobParametersBuilder()
                .addLong("startAt", System.currentTimeMillis())
                .toJobParameters();
        jobLauncher.run(importUserJob, params);
    }
}

3. 執(zhí)行流程可視化

圖片圖片

4. 運(yùn)行效果驗(yàn)證

  • 控制臺(tái)輸出:
2023-10-01 10:00:00 INFO  o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=importUserJob]] launched
2023-10-01 10:00:05 INFO  o.s.batch.core.job.SimpleStepHandler - Executing step: [csvProcessing]
2023-10-01 10:00:15 ERROR o.s.batch.core.step.AbstractStep - Encountered an error executing step csvProcessing
org.springframework.batch.item.validator.ValidationException: 年齡不能為負(fù)數(shù): User(name=王五, age=-5, email=wangwu@example.com)
  • 數(shù)據(jù)庫(kù)結(jié)果:
SELECT * FROMusers;

圖片圖片

5. 調(diào)試技巧

  • 查看元數(shù)據(jù):
SELECT * FROM BATCH_JOB_INSTANCE;
SELECT * FROM BATCH_STEP_EXECUTION;
  • 重試失敗任務(wù):
// 在Job配置中添加容錯(cuò)機(jī)制
@Bean
public Step csvProcessingStep(){
    return stepBuilderFactory.get("csvProcessing")
            .<User, User>chunk(100)
            .reader(userReader())
            .processor(userProcessor())
            .writer(userWriter())
            .faultTolerant()
            .skipLimit(3) // 最多跳過3條錯(cuò)誤
            .skip(IllegalArgumentException.class)
            .build();
}
  • 日志監(jiān)控配置:
logging.level.org.springframework.batch=DEBUG
logging.level.org.hibernate.SQL=WARN

四、實(shí)戰(zhàn)案例:銀行交易對(duì)賬

1. 場(chǎng)景需求增強(qiáng)說明

核心流程:

圖片圖片

技術(shù)挑戰(zhàn):

  • 雙數(shù)據(jù)源讀?。ㄎ募?數(shù)據(jù)庫(kù))
  • 千萬級(jí)數(shù)據(jù)高效比對(duì)
  • 差異記錄快速入庫(kù)
  • 分布式環(huán)境運(yùn)行

2. 完整架構(gòu)設(shè)計(jì)

圖片圖片

3. 領(lǐng)域模型定義

@Data
@AllArgsConstructor
@NoArgsConstructor
publicclassTransaction{
    // 公共字段
    private String transactionId;
    private LocalDateTime tradeTime;
    private BigDecimal amount;
    
    // 銀行端數(shù)據(jù)
    private String bankSerialNo;
    private BigDecimal bankAmount;
    
    // 內(nèi)部系統(tǒng)數(shù)據(jù)
    private String internalOrderNo;
    private BigDecimal systemAmount;
    
    // 對(duì)賬結(jié)果
    private ReconStatus status;
    private String discrepancyType;
}

publicenum ReconStatus {
    MATCHED,       // 數(shù)據(jù)一致
    AMOUNT_DIFF,   // 金額不一致
    STATUS_DIFF,    // 狀態(tài)不一致
    ONLY_IN_BANK,   // 銀行單邊賬
    ONLY_IN_SYSTEM  // 系統(tǒng)單邊賬
}

4. 完整Job配置

@Configuration
@EnableBatchProcessing
publicclassBankReconJobConfig{

    // 主Job定義
    @Bean
    public Job bankReconciliationJob(Step downloadStep, Step reconStep, Step reportStep){
        return jobBuilderFactory.get("bankReconciliationJob")
                .start(downloadStep)
                .next(reconStep)
                .next(reportStep)
                .build();
    }

    // 文件下載Step
    @Bean
    public Step downloadStep(){
        return stepBuilderFactory.get("downloadStep")
                .tasklet((contribution, chunkContext) -> {
                    // 實(shí)現(xiàn)SFTP下載邏輯
                    sftpService.download("/bank/recon/20231001.csv");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    // 核心對(duì)賬Step
    @Bean
    public Step reconStep(){
        return stepBuilderFactory.get("reconStep")
                .<Transaction, Transaction>chunk(1000)
                .reader(compositeReader())
                .processor(compositeProcessor())
                .writer(compositeWriter())
                .faultTolerant()
                .skipLimit(100)
                .skip(DataIntegrityViolationException.class)
                .retryLimit(3)
                .retry(DeadlockLoserDataAccessException.class)
                .build();
    }

    // 組合數(shù)據(jù)讀取器
    @Bean
    public CompositeItemReader<Transaction> compositeReader(){
        returnnew CompositeItemReaderBuilder<Transaction>()
                .delegates(bankFileReader(), internalDbReader())
                .build();
    }

    // 銀行文件讀取器
    @Bean
    public FlatFileItemReader<Transaction> bankFileReader(){
        returnnew FlatFileItemReaderBuilder<Transaction>()
                .name("bankFileReader")
                .resource(new FileSystemResource("recon/20231001.csv"))
                .delimited()
                .names("transactionId","tradeTime","amount","bankSerialNo")
                .fieldSetMapper(fieldSet -> {
                    Transaction t = new Transaction();
                    t.setTransactionId(fieldSet.readString("transactionId"));
                    t.setBankSerialNo(fieldSet.readString("bankSerialNo"));
                    t.setBankAmount(fieldSet.readBigDecimal("amount"));
                    return t;
                })
                .build();
    }

    // 內(nèi)部數(shù)據(jù)庫(kù)讀取器
    @Bean
    public JdbcCursorItemReader<Transaction> internalDbReader(){
        returnnew JdbcCursorItemReaderBuilder<Transaction>()
                .name("internalDbReader")
                .dataSource(internalDataSource)
                .sql("SELECT order_no, amount, status FROM transactions WHERE trade_date = ?")
                .rowMapper((rs, rowNum) -> {
                    Transaction t = new Transaction();
                    t.setInternalOrderNo(rs.getString("order_no"));
                    t.setSystemAmount(rs.getBigDecimal("amount"));
                    return t;
                })
                .preparedStatementSetter(ps -> ps.setString(1, "2023-10-01"))
                .build();
    }

    // 組合處理器
    @Bean
    public CompositeItemProcessor<Transaction> compositeProcessor(){
        List<ItemProcessor<?, ?>> delegates = new ArrayList<>();
        delegates.add(new DataMatchingProcessor());
        delegates.add(new DiscrepancyClassifier());
        returnnew CompositeItemProcessorBuilder<>()
                .delegates(delegates)
                .build();
    }

    // 組合寫入器
    @Bean
    public CompositeItemWriter<Transaction> compositeWriter(){
        returnnew CompositeItemWriterBuilder<Transaction>()
                .delegates(
                    discrepancyDbWriter(),
                    alertMessageWriter()
                )
                .build();
    }
}

5. 核心處理器實(shí)現(xiàn)

publicclassDataMatchingProcessorimplementsItemProcessor<Transaction, Transaction> {

    @Override
    public Transaction process(Transaction item){
        // 雙數(shù)據(jù)源匹配邏輯
        if (item.getBankSerialNo() == null) {
            item.setStatus(ReconStatus.ONLY_IN_SYSTEM);
        } elseif (item.getInternalOrderNo() == null) {
            item.setStatus(ReconStatus.ONLY_IN_BANK);
        } else {
            compareAmounts(item);
            compareStatuses(item);
        }
        return item;
    }

    privatevoidcompareAmounts(Transaction t){
        if (t.getBankAmount().compareTo(t.getSystemAmount()) != 0) {
            t.setDiscrepancyType("AMOUNT_MISMATCH");
            t.setStatus(ReconStatus.AMOUNT_DIFF);
            BigDecimal diff = t.getBankAmount().subtract(t.getSystemAmount());
            t.setAmount(diff.abs());
        }
    }

    privatevoidcompareStatuses(Transaction t){
        // 假設(shè)從數(shù)據(jù)庫(kù)獲取內(nèi)部狀態(tài)
        String internalStatus = transactionService.getStatus(t.getInternalOrderNo());
        if(!"SETTLED".equals(internalStatus)){
            t.setDiscrepancyType("STATUS_MISMATCH");
            t.setStatus(ReconStatus.STATUS_DIFF);
        }
    }
}

publicclassDiscrepancyClassifierimplementsItemProcessor<Transaction, Transaction> {
    @Override
    public Transaction process(Transaction item){
        if (item.getStatus() != ReconStatus.MATCHED) {
            // 添加告警標(biāo)記
            item.setAlertLevel(calculateAlertLevel(item));
        }
        return item;
    }

    private AlertLevel calculateAlertLevel(Transaction t){
        if (t.getAmount().compareTo(new BigDecimal("1000000")) > 0) {
            return AlertLevel.CRITICAL;
        }
        return AlertLevel.WARNING;
    }
}

6. 差異報(bào)告生成Step

@Bean
public Step reportStep(){
    return stepBuilderFactory.get("reportStep")
            .<Transaction, Transaction>chunk(1000)
            .reader(discrepancyReader())
            .writer(excelWriter())
            .build();
}

@Bean
public JdbcPagingItemReader<Transaction> discrepancyReader(){
    returnnew JdbcPagingItemReaderBuilder<Transaction>()
            .name("discrepancyReader")
            .dataSource(reconDataSource)
            .selectClause("SELECT *")
            .fromClause("FROM discrepancy_records")
            .whereClause("WHERE recon_date = '2023-10-01'")
            .sortKeys(Collections.singletonMap("transaction_id", Order.ASCENDING))
            .rowMapper(new BeanPropertyRowMapper<>(Transaction.class))
            .build();
}

@Bean
public ExcelFileItemWriter<Transaction> excelWriter(){
    returnnew ExcelFileItemWriterBuilder<Transaction>()
            .name("excelWriter")
            .resource(new FileSystemResource("reports/2023-10-01.xlsx"))
            .sheetName("差異報(bào)告")
            .headers(new String[]{"交易ID", "差異類型", "金額差異", "告警級(jí)別"})
            .fieldExtractor(item -> new Object[]{
                    item.getTransactionId(),
                    item.getDiscrepancyType(),
                    item.getAmount(),
                    item.getAlertLevel()
            })
            .build();
}

7. 性能優(yōu)化配置

# 應(yīng)用配置
spring.batch.job.enabled=false# 禁止自動(dòng)啟動(dòng)
spring.batch.initialize-schema=never # 生產(chǎn)環(huán)境禁止自動(dòng)建表

# 性能調(diào)優(yōu)參數(shù)
spring.batch.chunk.size=2000 # 根據(jù)內(nèi)存調(diào)整
spring.datasource.hikari.maximum-pool-size=20
spring.jpa.properties.hibernate.jdbc.batch_size=1000

8. 執(zhí)行監(jiān)控看板

圖片圖片

五、生產(chǎn)級(jí)特性

1. 容錯(cuò)機(jī)制

圖片圖片

  • 完整容錯(cuò)配置示例:
@Bean
public Step secureStep(){
    return stepBuilderFactory.get("secureStep")
            .<Input, Output>chunk(500)
            .reader(jdbcReader())
            .processor(secureProcessor())
            .writer(restApiWriter())
            .faultTolerant()
            .retryLimit(3)
            .retry(ConnectException.class) // 網(wǎng)絡(luò)問題重試
            .retry(DeadlockLoserDataAccessException.class) // 數(shù)據(jù)庫(kù)死鎖重試
            .skipLimit(100)
            .skip(DataIntegrityViolationException.class) // 數(shù)據(jù)問題跳過
            .skip(InvalidDataAccessApiUsageException.class)
            .noRollback(ValidationException.class) // 驗(yàn)證異常不回滾
            .listener(newErrorLogListener()) // 自定義監(jiān)聽器
            .build();
}

// 錯(cuò)誤日志監(jiān)聽器示例
publicclassErrorLogListenerimplementsItemProcessListener<Input, Output> {
    @Override
    publicvoidonProcessError(Input item, Exception e){
        ErrorLog log = new ErrorLog();
        log.setItemData(item.toString());
        log.setErrorMsg(e.getMessage());
        errorLogRepository.save(log);
    }
}

2. 性能優(yōu)化策略(千萬級(jí)數(shù)據(jù)處理)

策略1:并行Step執(zhí)行

圖片圖片

配置代碼:

@Bean
public Job parallelJob(){
    return jobBuilderFactory.get("parallelJob")
            .start(step1())
            .split(new SimpleAsyncTaskExecutor()) // 啟用異步執(zhí)行器
            .add(step2(), step3())
            .build();
}
策略2:分區(qū)處理(Partitioning)

圖片圖片

  • 分區(qū)處理器實(shí)現(xiàn):
@Bean
public Step masterStep(){
    return stepBuilderFactory.get("masterStep")
            .partitioner("slaveStep", partitioner())
            .gridSize(10) // 分區(qū)數(shù)量=CPU核心數(shù)*2
            .taskExecutor(new ThreadPoolTaskExecutor())
            .build();
}

@Bean
public Partitioner partitioner(){
    returnnew Partitioner() {
        @Override
        public Map<String, ExecutionContext> partition(int gridSize){
            Map<String, ExecutionContext> result = new HashMap<>();
            long total = getTotalRecordCount();
            
            long range = total / gridSize;
            for (int i = 0; i < gridSize; i++) {
                ExecutionContext context = new ExecutionContext();
                context.putLong("min", i * range);
                context.putLong("max", (i+1) * range);
                result.put("partition"+i, context);
            }
            return result;
        }
    };
}

// Slave Step配置
@Bean
public Step slaveStep(){
    return stepBuilderFactory.get("slaveStep")
            .<Record, Result>chunk(1000)
            .reader(rangeReader(null, null))
            .processor(processor())
            .writer(writer())
            .build();
}

@StepScope
@Bean
public ItemReader<Record> rangeReader(
        @Value("#{stepExecutionContext[min]}") Long min,
        @Value("#{stepExecutionContext[max]}") Long max) {
    returnnew JdbcCursorItemReaderBuilder<Record>()
            .sql("SELECT * FROM records WHERE id BETWEEN ? AND ?")
            .preparedStatementSetter(ps -> {
                ps.setLong(1, min);
                ps.setLong(2, max);
            })
            // 其他配置...
            .build();
}
  • 策略3:異步ItemProcessor

圖片圖片

  • 異步處理配置:
@Bean
public Step asyncStep(){
    return stepBuilderFactory.get("asyncStep")
            .<Input, Output>chunk(1000)
            .reader(reader())
            .processor(asyncItemProcessor())
            .writer(writer())
            .build();
}

@Bean
public AsyncItemProcessor<Input, Output> asyncItemProcessor(){
    AsyncItemProcessor<Input, Output> asyncProcessor = new AsyncItemProcessor<>();
    asyncProcessor.setDelegate(syncProcessor()); // 同步處理器
    asyncProcessor.setTaskExecutor(new ThreadPoolTaskExecutor());
    return asyncProcessor;
}

@Bean
public AsyncItemWriter<Output> asyncItemWriter(){
    AsyncItemWriter<Output> asyncWriter = new AsyncItemWriter<>();
    asyncWriter.setDelegate(syncWriter()); // 同步寫入器
    return asyncWriter;
}

3. 性能對(duì)比測(cè)試數(shù)據(jù)

圖片圖片

優(yōu)化技巧:

  • 數(shù)據(jù)庫(kù)連接池調(diào)優(yōu):
spring.datasource.hikari.maximum-pool-size=20
spring.datasource.hikari.minimum-idle=5
  • JVM參數(shù)優(yōu)化:
java -jar -Xmx4g -XX:+UseG1GC -XX:MaxGCPauseMillis=200 ...
  • 批處理參數(shù)調(diào)整:
.chunk(2000) // 根據(jù)內(nèi)存容量調(diào)整
.setQueryTimeout(60) // 數(shù)據(jù)庫(kù)查詢超時(shí)

六、監(jiān)控與管理(生產(chǎn)級(jí)方案)

1. 監(jiān)控方案升級(jí)(Spring Batch Admin替代方案)

圖片圖片

  • 現(xiàn)代監(jiān)控棧配置:
// 添加監(jiān)控依賴
<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
// 暴露監(jiān)控端點(diǎn)
@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags(){
    return registry -> registry.config().commonTags("application", "batch-service");
}

// 自定義Batch指標(biāo)
publicclassBatchMetricsListenerextendsJobExecutionListenerSupport{
    privatefinal Counter processedRecords = Counter.builder("batch.records.processed")
            .description("Total processed records")
            .register(Metrics.globalRegistry);
    
    @Override
    publicvoidafterStep(StepExecution stepExecution){
        processedRecords.increment(stepExecution.getWriteCount());
    }
}

2. 元數(shù)據(jù)表結(jié)構(gòu)詳解

圖片圖片

關(guān)鍵表用途:

  • BATCH_JOB_INSTANCE:作業(yè)指紋庫(kù)(相同參數(shù)只能存在一個(gè)實(shí)例)
  • BATCH_JOB_EXECUTION_PARAMS:存儲(chǔ)每次運(yùn)行的參數(shù)
  • BATCH_STEP_EXECUTION_CONTEXT:保存步驟上下文數(shù)據(jù)(重啟恢復(fù)的關(guān)鍵)

3. 自定義監(jiān)控看板

-- 常用監(jiān)控SQL示例
-- 最近5次作業(yè)執(zhí)行情況
SELECT j.JOB_NAME, e.START_TIME, e.END_TIME, 
       TIMEDIFF(e.END_TIME, e.START_TIME) ASDURATION,
       s.READ_COUNT, s.WRITE_COUNT
FROM BATCH_JOB_EXECUTION e
JOIN BATCH_JOB_INSTANCE j ON e.JOB_INSTANCE_ID = j.JOB_INSTANCE_ID
JOIN BATCH_STEP_EXECUTION s ON e.JOB_EXECUTION_ID = s.JOB_EXECUTION_ID
ORDERBY e.START_TIME DESCLIMIT5;

七、常見問題Q&A(終極指南)

1. 內(nèi)存溢出問題深度解決方案

場(chǎng)景:處理10GB CSV文件時(shí)OOM

圖片圖片

  • 優(yōu)化代碼示例:
@Bean
@StepScope
public FlatFileItemReader<LargeRecord> largeFileReader(
        @Value("#{jobParameters['filePath']}") String filePath) {
    
    returnnew FlatFileItemReaderBuilder<LargeRecord>()
            .resource(new FileSystemResource(filePath))
            .lineMapper(new DefaultLineMapper<>() {{
                setLineTokenizer(new DelimitedLineTokenizer());
                setFieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
                    setTargetType(LargeRecord.class);
                }});
            }})
            .linesToSkip(1)
            .strict(false) // 允許文件結(jié)尾空行
            .saveState(false) // 禁用狀態(tài)保存
            .build();
}

// JVM參數(shù)建議
// -XX:+UseG1GC -Xmx2g -XX:MaxGCPauseMillis=200

2. 定時(shí)任務(wù)高級(jí)配置

  • 多任務(wù)調(diào)度方案:
@Configuration
@EnableScheduling
publicclassScheduleConfig{

    @Autowiredprivate JobLauncher jobLauncher;
    @Autowiredprivate Job reportJob;
    
    // 工作日凌晨執(zhí)行
    @Scheduled(cron = "0 0 2 * * MON-FRI")
    publicvoiddailyJob()throws Exception {
        JobParameters params = new JobParametersBuilder()
                .addString("date", LocalDate.now().toString())
                .toJobParameters();
        jobLauncher.run(reportJob, params);
    }

    // 每小時(shí)輪詢
    @Scheduled(fixedRate = 3600000)
    publicvoidpollJob(){
        if(checkNewDataExists()) {
            jobLauncher.run(dataProcessJob, new JobParameters());
        }
    }
    
    // 優(yōu)雅停止示例
    publicvoidstopJob(Long executionId){
        JobExecution execution = jobExplorer.getJobExecution(executionId);
        if(execution.isRunning()) {
            execution.setStatus(BatchStatus.STOPPING);
            jobRepository.update(execution);
        }
    }
}

3. 高頻問題集錦

Q:如何重新運(yùn)行失敗的任務(wù)?

-- 步驟1:查詢失敗的任務(wù)ID
SELECT * FROM BATCH_JOB_EXECUTION WHERESTATUS = 'FAILED';

-- 步驟2:使用相同參數(shù)重新啟動(dòng)
JobParameters params = new JobParametersBuilder()
        .addLong("restartId", originalExecutionId)
        .toJobParameters();
jobLauncher.run(job, params);

Q:處理過程中斷電怎么辦?

圖片圖片

Q:如何實(shí)現(xiàn)動(dòng)態(tài)參數(shù)傳遞?

// 命令行啟動(dòng)方式
java -jar batch.jar --spring.batch.job.name=dataImportJob date=2023-10-01

// 編程式參數(shù)構(gòu)建
publicvoidrunJobWithParams(Map<String, Object> params){
    JobParameters jobParams = new JobParametersBuilder()
            .addString("mode", "forceUpdate")
            .addLong("timestamp", System.currentTimeMillis())
            .toJobParameters();
    jobLauncher.run(importJob, jobParams);
}

4. 性能調(diào)優(yōu)檢查清單

數(shù)據(jù)庫(kù)優(yōu)化
  • 添加批量處理索引
  • 配置連接池參數(shù)
  • 啟用JDBC批處理模式
JVM優(yōu)化
-XX:+UseStringDeduplication
-XX:+UseCompressedOops
-XX:MaxMetaspaceSize=512m
Batch配置
spring.batch.jdbc.initialize-schema=never
spring.batch.job.enabled=false
spring.jpa.open-in-view=false


責(zé)任編輯:武曉燕 來源: 碼猿技術(shù)專欄
相關(guān)推薦

2025-10-14 09:12:49

2025-08-04 09:33:42

2025-06-05 00:00:00

項(xiàng)目接口合并

2022-12-29 08:43:43

項(xiàng)目接口請(qǐng)求

2021-03-08 08:02:40

IDEA插件JSON

2021-02-02 15:38:19

Disruptor緩存Java

2025-04-29 08:00:36

2022-02-23 11:47:57

CharlesFiddler抓包

2021-03-26 15:18:11

代碼工具Mockoon

2025-07-23 09:34:24

2021-05-31 09:02:55

KPI考核工具公司

2022-01-27 08:12:50

Potplayer播放器

2025-09-01 01:25:00

SpringMVC注解

2009-06-18 15:40:07

Spring Batc

2025-05-09 08:40:42

插件頁(yè)面Vite

2025-09-08 09:58:06

2020-12-11 11:26:47

Spring批處理重試

2025-03-03 10:04:49

2022-08-02 20:47:38

Spring框架應(yīng)用程序

2025-06-30 02:25:00

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)