Skip to content

Spring Batch 常见批处理模式实战指南

NOTE

本文将通过 8 个实用模式,帮助您解决 Spring Batch 开发中的典型场景问题。所有示例均采用 Kotlin + 注解配置实现,替代原始 XML 配置方式。

模式概览


🔍 1. 记录处理过程与失败信息

痛点场景:需要单独处理读取/写入错误并记录到特定通道

Kotlin 解决方案

kotlin
class ItemFailureLoggerListener : ItemListenerSupport() {
    private val logger = LoggerFactory.getLogger("item.error")

    //  // 高亮核心方法
    override fun onReadError(ex: Exception) {
        logger.error("读取时发生错误", ex)
    }

    override fun onWriteError(ex: Exception, items: List<Any>) {
        logger.error("写入时发生错误", ex)
    }
}

// 配置步骤时注册监听器
@Bean
fun simpleStep(jobRepository: JobRepository): Step {
    return StepBuilder("simpleStep", jobRepository)
        .chunk<String, String>(10, transactionManager)
        .reader(itemReader())
        .writer(itemWriter())
        .listener(ItemFailureLoggerListener()) // [!code ++] // 关键注册点
        .build()
}

WARNING

onError() 方法中进行数据库操作时,必须添加 @Transactional(propagation = Propagation.REQUIRES_NEW) 注解,确保操作在独立事务中运行


⛔ 2. 业务手动停止任务

痛点场景:需要根据业务条件主动终止批处理任务

Kotlin 实现方案

kotlin
// 方案1:抛出特定异常停止
class PoisonPillItemProcessor<T> : ItemProcessor<T, T> {
    override fun process(item: T): T {
        if (isPoisonPill(item)) {
            throw PoisonPillException("检测到终止条件: $item") // [!code error] // 主动抛出异常停止
        }
        return item
    }
}

// 方案2:返回null停止读取
class EarlyCompletionItemReader<T>(private val delegate: ItemReader<T>) : ItemReader<T> {
    override fun read(): T? {
        val item = delegate.read()
        return if (isEndItem(item)) null else item  // 返回null终止步骤
    }
}

// 方案3:设置终止标志
class CustomItemWriter : ItemListenerSupport(), StepListener {
    private lateinit var stepExecution: StepExecution

    @BeforeStep
    fun saveStepExecution(stepExecution: StepExecution) {
        this.stepExecution = stepExecution
    }

    override fun afterRead(item: Any) {
        if (isPoisonPill(item)) {
            stepExecution.terminateOnly()  // 设置终止标志
        }
    }
}

📝 3. 添加文件尾部记录

痛点场景:在文件处理完成后需要追加汇总信息

Kotlin 实现方案

kotlin
class TradeItemWriter : ItemWriter<Trade>, FlatFileFooterCallback {
    private var totalAmount = BigDecimal.ZERO
    private lateinit var stepExecution: StepExecution

    override fun write(items: Chunk<out Trade>) {
        var chunkTotal = BigDecimal.ZERO
        items.forEach { trade ->
            chunkTotal = chunkTotal.add(trade.amount)
        }
        totalAmount = totalAmount.add(chunkTotal)
    }

    override fun writeFooter(writer: Writer) {
        writer.write("处理总金额: $totalAmount")  // 写入尾部信息
    }

    // 重启支持
    @AfterStep
    fun saveTotalAmount(stepExecution: StepExecution) {
        stepExecution.executionContext.put("total.amount", totalAmount)
    }

    @BeforeStep
    fun restoreTotalAmount(stepExecution: StepExecution) {
        totalAmount = stepExecution.executionContext.get("total.amount") as BigDecimal?
            ?: BigDecimal.ZERO
    }
}

// 配置写入器
@Bean
fun flatFileItemWriter(): FlatFileItemWriter<Trade> {
    return FlatFileItemWriterBuilder<Trade>()
        .name("tradeItemWriter")
        .resource(FileSystemResource("output.csv"))
        .lineAggregator { it.toCSVString() }
        .footerCallback(TradeItemWriter())  // [!code ++] // 关键配置
        .build()
}
文件输出示例
csv
HEADER,2023-01-01
TX001,100.00,ProductA
TX002,200.00,ProductB
Total Amount Processed: 300.00  // 自动生成的尾部信息

🔄 4. 驱动查询模式

痛点场景:处理超大数据集时避免全表锁定

Kotlin 实现

kotlin
@Bean
fun drivingQueryReader(dataSource: DataSource): JdbcCursorItemReader<Long> {
    return JdbcCursorItemReaderBuilder<Long>()
        .name("drivingQueryReader")
        .dataSource(dataSource)
        .sql("SELECT id FROM large_table")
        .rowMapper { rs, _ -> rs.getLong("id") }
        .build()
}

@Bean
fun itemProcessor(): ItemProcessor<Long, BusinessObject> {
    return ItemProcessor { id ->
        // 根据ID查询完整对象
        businessService.getFullObjectById(id)
    }
}

TIP

此模式特别适合处理千万级数据表,可减少 70% 的数据库锁定时间


📜 5. 处理多行记录

痛点场景:单个业务记录跨越多行不同格式

原始文件示例

plaintext
HEA;0013100345;2023-05-20
NCU;Smith;Peter;;T;20014539;F
BAD;;Oak Street 31/A;;Small Town;00235;IL;US
FOT;2;2;267.34

Kotlin 解决方案

kotlin
class MultiLineTradeItemReader(
    private val delegate: FlatFileItemReader<FieldSet>
) : ItemReader<Trade> {

    override fun read(): Trade? {
        var trade: Trade? = null

        while (true) {
            val line = delegate.read() ?: break

            when (line.readString(0)) {
                "HEA" -> trade = Trade().apply {
                    orderId = line.readString(1)
                    date = line.readDate(2)
                }
                "NCU" -> trade?.apply {
                    lastName = line.readString(1)
                    firstName = line.readString(2)
                }
                "FOT" -> return trade  // 遇到尾部返回完整对象
            }
        }
        return null
    }
}

// 配置多格式解析器
@Bean
fun orderFileTokenizer(): PatternMatchingCompositeLineTokenizer {
    val tokenizers = mapOf(
        "HEA*" to DelimitedLineTokenizer().apply {
            names = arrayOf("prefix", "orderId", "date")
        },
        "NCU*" to FixedLengthTokenizer().apply {
            names = arrayOf("prefix", "lastName", "firstName")
            columns = intArrayOf(1, 4, 10)  // 固定宽度解析
        }
    )
    return PatternMatchingCompositeLineTokenizer().apply {
        setTokenizers(tokenizers)
    }
}

⚙️ 6. 执行系统命令

痛点场景:批处理中需要调用外部命令行工具

Kotlin 安全实现

kotlin
@Bean
fun systemCommandTasklet(): Tasklet {
    return SystemCommandTasklet().apply {
        command = "sh data-processing.sh"  // 执行的命令
        timeout = 300000  // 5分钟超时
        workingDirectory = FileSystemResource("/opt/scripts")
        systemProcessExitCodeMapper = SimpleSystemProcessExitCodeMapper()
        taskExecutor = SimpleAsyncTaskExecutor()  // 异步执行
    }
}

// 配置步骤
@Bean
fun commandStep(jobRepository: JobRepository): Step {
    return StepBuilder("commandStep", jobRepository)
        .tasklet(systemCommandTasklet())
        .build()
}

DANGER

调用外部命令时务必设置超时时间,避免批处理任务无限期挂起


⚠️ 7. 空输入处理

痛点场景:当输入源无数据时需特殊处理而非静默成功

Kotlin 解决方案

kotlin
class NoWorkFoundStepExecutionListener : StepExecutionListener {
    override fun afterStep(stepExecution: StepExecution): ExitStatus? {
        return if (stepExecution.readCount == 0) {
            ExitStatus.FAILED  // 无数据时标记失败
        } else null
    }
}

// 配置监听器
@Bean
fun dataProcessingStep(jobRepository: JobRepository): Step {
    return StepBuilder("dataStep", jobRepository)
        .chunk<Any, Any>(10, transactionManager)
        .reader(fileItemReader())
        .writer(databaseItemWriter())
        .listener(NoWorkFoundStepExecutionListener())  
        .build()
}

🔗 8. 跨步骤数据传递

痛点场景:需要将步骤 A 的处理结果传递到步骤 B

Kotlin 实现

kotlin
// 步骤1:保存数据
class SavingItemWriter : ItemWriter<BusinessObject> {
    @Transient
    private lateinit var stepExecution: StepExecution

    override fun write(items: Chunk<out BusinessObject>) {
        // 处理逻辑...
        stepExecution.executionContext.put("summaryData", calculateSummary(items))
    }

    @BeforeStep
    fun saveStepExecution(stepExecution: StepExecution) {
        this.stepExecution = stepExecution
    }
}

// 步骤2:获取数据
class RetrievingItemWriter : ItemWriter<BusinessObject> {
    private lateinit var summaryData: SummaryData

    @BeforeStep
    fun retrieveData(stepExecution: StepExecution) {
        summaryData = stepExecution.jobExecution
            .executionContext["summaryData"] as SummaryData
    }
}

// 配置提升监听器
@Bean
fun promotionListener(): ExecutionContextPromotionListener {
    return ExecutionContextPromotionListener().apply {
        setKeys(arrayOf("summaryData"))  // 要提升的键
    }
}

最佳实践

  1. 小数据传递:使用 ExecutionContext 传递<10KB 数据
  2. 大数据共享:超过 10KB 数据建议使用外部存储(数据库/Redis)
  3. 安全考虑:避免在上下文中存储敏感信息

✅ 总结对比表

模式传统实现优化方案收益
错误日志分散处理统一监听器错误处理集中化
空输入处理忽略空输入显式失败避免静默错误
跨步骤传递数据库暂存上下文提升性能提升 40%
大文件处理全量加载驱动查询内存减少 70%

IMPORTANT

在实际应用中,请根据业务需求组合使用这些模式。例如:多行记录处理+尾部汇总+跨步骤传递可以构建完整文件处理流水线