Skip to content

Spring Batch 拦截 Step 执行监听器指南

引言:理解 Step 监听器的重要性

在 Spring Batch 中,Step 是批处理作业的核心执行单元。就像作业(Job)执行过程中有各种事件一样,在 Step 执行的不同阶段,我们也需要执行特定逻辑。例如:

  • ✅ 在文件导出任务结束时添加文件尾部信息
  • ✅ 记录跳过处理的异常数据项
  • ✅ 在事务提交前执行数据校验
  • ✅ 监控每个 chunk 的处理性能

监听器的本质

Spring Batch 通过 StepListener 接口体系提供了细粒度的生命周期回调,让我们可以在批处理的关键节点注入自定义逻辑。

监听器配置指南

注册监听器的推荐方式

kotlin
@Configuration
class BatchConfig {

    @Bean
    fun step1(jobRepository: JobRepository, transactionManager: PlatformTransactionManager): Step {
        return StepBuilder("step1", jobRepository)
            .chunk<String, String>(10, transactionManager)
            .reader(reader())
            .writer(writer())
            .listener(chunkListener()) // 注册自定义监听器
            .build()
    }

    // 自定义 Chunk 监听器实现
    @Bean
    fun chunkListener(): ChunkListener = object : ChunkListener {
        override fun beforeChunk(context: ChunkContext) {
            println("🚀 事务开始,准备读取数据...")
        }
    }
}

重要原则

监听器应注册在最贴近其功能作用域的层级

  • 如果监听器只与 chunk 处理相关 → 注册在 chunk 级别
  • 如果监听器涉及整个 step → 注册在 step 级别

自动注册机制

ItemReader/ItemWriter/ItemProcessor 自身实现了监听器接口时,Spring Batch 会自动注册:

kotlin
@Component
class CustomItemReader : ItemReader<String>, ItemReadListener<String> {

    override fun read(): String? {
        // 读取逻辑
    }

    // 实现监听器方法会被自动调用
    override fun afterRead(item: String) {
        println("✅ 成功读取: $item")
    }
}

核心监听器详解

1. StepExecutionListener - 最通用的监听器

接口定义与使用

kotlin
interface StepExecutionListener : StepListener {
    fun beforeStep(stepExecution: StepExecution) // Step开始前
    fun afterStep(stepExecution: StepExecution): ExitStatus // Step结束后
}

// 注解方式实现
@Component
class CustomStepListener {
    @BeforeStep
    fun beforeStep(stepExecution: StepExecution) {
        println("⏰ Step [${stepExecution.stepName}] 开始执行")
    }

    @AfterStep
    fun afterStep(stepExecution: StepExecution): ExitStatus {
        val status = if (stepExecution.exitStatus.exitCode == "COMPLETED") "✅" else "❌"
        println("$status Step [${stepExecution.stepName}] 执行完成")
        return stepExecution.exitStatus
    }
}

TIP

afterStep 可以修改退出状态码,适合用于根据执行结果动态决定后续流程

2. ChunkListener - 事务块监听器

关键方法

kotlin
interface ChunkListener : StepListener {
    fun beforeChunk(context: ChunkContext) // 事务开始后,读取前
    fun afterChunk(context: ChunkContext)  // Chunk成功提交后
    fun afterChunkError(context: ChunkContext) // Chunk处理出错时
}

// 示例:监控Chunk处理时间
@Component
class PerformanceMonitor : ChunkListener {
    private lateinit var startTime: Instant

    @BeforeChunk
    override fun beforeChunk(context: ChunkContext) {
        startTime = Instant.now()
    }

    @AfterChunk
    override fun afterChunk(context: ChunkContext) {
        val duration = Duration.between(startTime, Instant.now())
        println("⏱️ Chunk处理耗时: ${duration.toMillis()}ms")
    }
}

CAUTION

ChunkListener 不适合抛出受检异常,错误应在实现内部处理

3. 数据处理监听器三剑客

ItemReadListener - 读操作监听

kotlin
interface ItemReadListener<T> : StepListener {
    fun beforeRead() // 每次read()调用前
    fun afterRead(item: T) // 成功读取后
    fun onReadError(ex: Exception) // 读取异常时
}

// 使用示例
@Component
class ReadLogger : ItemReadListener<String> {
    @BeforeRead
    override fun beforeRead() = println("📖 准备读取下一条记录...")

    @AfterRead
    override fun afterRead(item: String) = println("✅ 成功读取: $item")

    @OnReadError
    override fun onReadError(ex: Exception) = println("❌ 读取失败: ${ex.message}")
}

ItemProcessListener - 处理过程监听

kotlin
interface ItemProcessListener<T, S> : StepListener {
    fun beforeProcess(item: T) // 处理前
    fun afterProcess(item: T, result: S?) // 成功处理后
    fun onProcessError(item: T, ex: Exception) // 处理异常时
}

// 注解方式实现
@Component
class ProcessMonitor {
    @BeforeProcess
    fun <T> beforeProcess(item: T) {
        println("⚙️ 开始处理: $item")
    }

    @AfterProcess
    fun <T, S> afterProcess(item: T, result: S?) {
        println("✨ 处理完成: $item$result")
    }
}

ItemWriteListener - 写操作监听

kotlin
interface ItemWriteListener<S> : StepListener {
    fun beforeWrite(items: List<S>) // 写入前
    fun afterWrite(items: List<S>) // 成功写入后
    fun onWriteError(ex: Exception, items: List<S>) // 写入异常时
}

// 示例:批量写入前校验
@Component
class WriteValidator : ItemWriteListener<Any> {
    @BeforeWrite
    override fun beforeWrite(items: List<Any>) {
        if (items.size > 100) {
            throw ValidationException("单次写入不能超过100条记录")
        }
    }

    @OnWriteError
    override fun onWriteError(ex: Exception, items: List<Any>) {
        println("⚠️ 写入失败: ${items.size}条记录. 原因: ${ex.message}")
    }
}

4. SkipListener - 跳过记录追踪

接口实现

kotlin
interface SkipListener<T, S> : StepListener {
    fun onSkipInRead(t: Throwable) // 读取时跳过
    fun onSkipInProcess(item: T, t: Throwable) // 处理时跳过
    fun onSkipInWrite(item: S, t: Throwable) // 写入时跳过
}

// 跳过记录日志
@Component
class SkipLogger : SkipListener<Any, Any> {
    @OnSkipInRead
    override fun onSkipInRead(t: Throwable) {
        println("📝 [读取跳过] 原因: ${t.message}")
    }

    @OnSkipInProcess
    override fun onSkipInProcess(item: Any, t: Throwable) {
        println("📝 [处理跳过] 条目: $item, 原因: ${t.message}")
    }

    @OnSkipInWrite
    override fun onSkipInWrite(item: Any, t: Throwable) {
        println("📝 [写入跳过] 条目: $item, 原因: ${t.message}")
    }
}

IMPORTANT

SkipListener 的两个关键保证:

  1. 每个跳过的项目只会触发一次回调
  2. 回调总是在事务提交前执行,确保日志不会回滚

监听器最佳实践

组合使用多个监听器

kotlin
@Bean
fun comprehensiveStep(jobRepository: JobRepository): Step {
    return StepBuilder("dataProcessingStep", jobRepository)
        .chunk<Input, Output>(100, transactionManager)
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .listener(StepPerformanceMonitor()) 
        .listener(DataValidationListener()) 
        .listener(SkipHandler()) 
        .faultTolerant()
        .skipLimit(100)
        .skip(ValidationException::class.java)
        .build()
}

注解驱动的监听器配置

kotlin
// 使用注解配置监听方法
class CompositeListener {
    // Step级别监听
    @AfterStep
    fun finalReport(execution: StepExecution) {
        println("生成最终报告: 处理${execution.writeCount}条记录")
    }

    // Chunk级别监听
    @AfterChunk
    fun chunkCompleted() {
        println("--- Chunk处理完成 ---")
    }

    // Item处理监听
    @AfterProcess
    fun <I, O> afterProcessing(item: I, result: O) {
        println("处理转换: $item$result")
    }
}

性能考虑

避免在监听器中执行耗时操作,尤其是高频调用的方法(如 afterRead/afterProcess),否则会显著降低批处理性能。

总结与核心要点

监听器类型主要用途关键方法
StepExecutionListener监控整个Step生命周期beforeStep, afterStep
ChunkListener事务块控制beforeChunk, afterChunk
ItemReadListener读操作监控afterRead, onReadError
ItemProcessListener处理过程监控afterProcess, onProcessError
ItemWriteListener写操作监控beforeWrite, onWriteError
SkipListener跳过记录跟踪onSkipInRead/Process/Write

实践建议

  1. 优先使用注解方式配置监听器,更简洁清晰
  2. 为每个监听器保持单一职责,避免功能混杂
  3. 在监听器中添加充分日志,便于问题排查
  4. 对跳过记录实施持久化存储,方便后续修复

“Spring Batch 的监听器体系就像给批处理装上了'传感器',让每个处理阶段都变得可见、可控、可观测。” —— 批处理架构师实践心得

通过合理使用这些监听器,您将能构建出健壮、可观测性强的批处理系统,轻松应对各种复杂业务场景。