Appearance
Spring Batch 拦截 Step 执行监听器指南
引言:理解 Step 监听器的重要性
在 Spring Batch 中,Step
是批处理作业的核心执行单元。就像作业(Job
)执行过程中有各种事件一样,在 Step 执行的不同阶段,我们也需要执行特定逻辑。例如:
- ✅ 在文件导出任务结束时添加文件尾部信息
- ✅ 记录跳过处理的异常数据项
- ✅ 在事务提交前执行数据校验
- ✅ 监控每个 chunk 的处理性能
监听器的本质
Spring Batch 通过 StepListener 接口体系提供了细粒度的生命周期回调,让我们可以在批处理的关键节点注入自定义逻辑。
监听器配置指南
注册监听器的推荐方式
kotlin
@Configuration
class BatchConfig {
@Bean
fun step1(jobRepository: JobRepository, transactionManager: PlatformTransactionManager): Step {
return StepBuilder("step1", jobRepository)
.chunk<String, String>(10, transactionManager)
.reader(reader())
.writer(writer())
.listener(chunkListener()) // 注册自定义监听器
.build()
}
// 自定义 Chunk 监听器实现
@Bean
fun chunkListener(): ChunkListener = object : ChunkListener {
override fun beforeChunk(context: ChunkContext) {
println("🚀 事务开始,准备读取数据...")
}
}
}
重要原则
监听器应注册在最贴近其功能作用域的层级:
- 如果监听器只与 chunk 处理相关 → 注册在 chunk 级别
- 如果监听器涉及整个 step → 注册在 step 级别
自动注册机制
当 ItemReader
/ItemWriter
/ItemProcessor
自身实现了监听器接口时,Spring Batch 会自动注册:
kotlin
@Component
class CustomItemReader : ItemReader<String>, ItemReadListener<String> {
override fun read(): String? {
// 读取逻辑
}
// 实现监听器方法会被自动调用
override fun afterRead(item: String) {
println("✅ 成功读取: $item")
}
}
核心监听器详解
1. StepExecutionListener - 最通用的监听器
接口定义与使用
kotlin
interface StepExecutionListener : StepListener {
fun beforeStep(stepExecution: StepExecution) // Step开始前
fun afterStep(stepExecution: StepExecution): ExitStatus // Step结束后
}
// 注解方式实现
@Component
class CustomStepListener {
@BeforeStep
fun beforeStep(stepExecution: StepExecution) {
println("⏰ Step [${stepExecution.stepName}] 开始执行")
}
@AfterStep
fun afterStep(stepExecution: StepExecution): ExitStatus {
val status = if (stepExecution.exitStatus.exitCode == "COMPLETED") "✅" else "❌"
println("$status Step [${stepExecution.stepName}] 执行完成")
return stepExecution.exitStatus
}
}
TIP
afterStep
可以修改退出状态码,适合用于根据执行结果动态决定后续流程
2. ChunkListener - 事务块监听器
关键方法
kotlin
interface ChunkListener : StepListener {
fun beforeChunk(context: ChunkContext) // 事务开始后,读取前
fun afterChunk(context: ChunkContext) // Chunk成功提交后
fun afterChunkError(context: ChunkContext) // Chunk处理出错时
}
// 示例:监控Chunk处理时间
@Component
class PerformanceMonitor : ChunkListener {
private lateinit var startTime: Instant
@BeforeChunk
override fun beforeChunk(context: ChunkContext) {
startTime = Instant.now()
}
@AfterChunk
override fun afterChunk(context: ChunkContext) {
val duration = Duration.between(startTime, Instant.now())
println("⏱️ Chunk处理耗时: ${duration.toMillis()}ms")
}
}
CAUTION
ChunkListener 不适合抛出受检异常,错误应在实现内部处理
3. 数据处理监听器三剑客
ItemReadListener - 读操作监听
kotlin
interface ItemReadListener<T> : StepListener {
fun beforeRead() // 每次read()调用前
fun afterRead(item: T) // 成功读取后
fun onReadError(ex: Exception) // 读取异常时
}
// 使用示例
@Component
class ReadLogger : ItemReadListener<String> {
@BeforeRead
override fun beforeRead() = println("📖 准备读取下一条记录...")
@AfterRead
override fun afterRead(item: String) = println("✅ 成功读取: $item")
@OnReadError
override fun onReadError(ex: Exception) = println("❌ 读取失败: ${ex.message}")
}
ItemProcessListener - 处理过程监听
kotlin
interface ItemProcessListener<T, S> : StepListener {
fun beforeProcess(item: T) // 处理前
fun afterProcess(item: T, result: S?) // 成功处理后
fun onProcessError(item: T, ex: Exception) // 处理异常时
}
// 注解方式实现
@Component
class ProcessMonitor {
@BeforeProcess
fun <T> beforeProcess(item: T) {
println("⚙️ 开始处理: $item")
}
@AfterProcess
fun <T, S> afterProcess(item: T, result: S?) {
println("✨ 处理完成: $item → $result")
}
}
ItemWriteListener - 写操作监听
kotlin
interface ItemWriteListener<S> : StepListener {
fun beforeWrite(items: List<S>) // 写入前
fun afterWrite(items: List<S>) // 成功写入后
fun onWriteError(ex: Exception, items: List<S>) // 写入异常时
}
// 示例:批量写入前校验
@Component
class WriteValidator : ItemWriteListener<Any> {
@BeforeWrite
override fun beforeWrite(items: List<Any>) {
if (items.size > 100) {
throw ValidationException("单次写入不能超过100条记录")
}
}
@OnWriteError
override fun onWriteError(ex: Exception, items: List<Any>) {
println("⚠️ 写入失败: ${items.size}条记录. 原因: ${ex.message}")
}
}
4. SkipListener - 跳过记录追踪
接口实现
kotlin
interface SkipListener<T, S> : StepListener {
fun onSkipInRead(t: Throwable) // 读取时跳过
fun onSkipInProcess(item: T, t: Throwable) // 处理时跳过
fun onSkipInWrite(item: S, t: Throwable) // 写入时跳过
}
// 跳过记录日志
@Component
class SkipLogger : SkipListener<Any, Any> {
@OnSkipInRead
override fun onSkipInRead(t: Throwable) {
println("📝 [读取跳过] 原因: ${t.message}")
}
@OnSkipInProcess
override fun onSkipInProcess(item: Any, t: Throwable) {
println("📝 [处理跳过] 条目: $item, 原因: ${t.message}")
}
@OnSkipInWrite
override fun onSkipInWrite(item: Any, t: Throwable) {
println("📝 [写入跳过] 条目: $item, 原因: ${t.message}")
}
}
IMPORTANT
SkipListener 的两个关键保证:
- 每个跳过的项目只会触发一次回调
- 回调总是在事务提交前执行,确保日志不会回滚
监听器最佳实践
组合使用多个监听器
kotlin
@Bean
fun comprehensiveStep(jobRepository: JobRepository): Step {
return StepBuilder("dataProcessingStep", jobRepository)
.chunk<Input, Output>(100, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer())
.listener(StepPerformanceMonitor())
.listener(DataValidationListener())
.listener(SkipHandler())
.faultTolerant()
.skipLimit(100)
.skip(ValidationException::class.java)
.build()
}
注解驱动的监听器配置
kotlin
// 使用注解配置监听方法
class CompositeListener {
// Step级别监听
@AfterStep
fun finalReport(execution: StepExecution) {
println("生成最终报告: 处理${execution.writeCount}条记录")
}
// Chunk级别监听
@AfterChunk
fun chunkCompleted() {
println("--- Chunk处理完成 ---")
}
// Item处理监听
@AfterProcess
fun <I, O> afterProcessing(item: I, result: O) {
println("处理转换: $item → $result")
}
}
性能考虑
避免在监听器中执行耗时操作,尤其是高频调用的方法(如 afterRead/afterProcess),否则会显著降低批处理性能。
总结与核心要点
监听器类型 | 主要用途 | 关键方法 |
---|---|---|
StepExecutionListener | 监控整个Step生命周期 | beforeStep , afterStep |
ChunkListener | 事务块控制 | beforeChunk , afterChunk |
ItemReadListener | 读操作监控 | afterRead , onReadError |
ItemProcessListener | 处理过程监控 | afterProcess , onProcessError |
ItemWriteListener | 写操作监控 | beforeWrite , onWriteError |
SkipListener | 跳过记录跟踪 | onSkipInRead/Process/Write |
实践建议
- 优先使用注解方式配置监听器,更简洁清晰
- 为每个监听器保持单一职责,避免功能混杂
- 在监听器中添加充分日志,便于问题排查
- 对跳过记录实施持久化存储,方便后续修复
“Spring Batch 的监听器体系就像给批处理装上了'传感器',让每个处理阶段都变得可见、可控、可观测。” —— 批处理架构师实践心得
通过合理使用这些监听器,您将能构建出健壮、可观测性强的批处理系统,轻松应对各种复杂业务场景。