Appearance
Spring Batch 重复操作机制指南 🌀
本文用通俗方式解析 Spring Batch 的核心重复执行框架,通过可视化图表+实战代码演示批处理任务的迭代控制
🔄 一、重复操作的核心价值
批处理的核心是重复执行相同逻辑(如数据处理/文件读写)。Spring Batch 通过 RepeatOperations
接口提供标准化迭代框架:
⚙️ 二、RepeatTemplate 实战
控制重复执行的核心引擎,通过回调函数执行业务逻辑:
kotlin
val template = RepeatTemplate().apply {
// 设置最大执行次数为5
setCompletionPolicy(SimpleCompletionPolicy(5))
}
template.iterate { context ->
// 业务逻辑(如读取文件/处理数据)
println("执行第 ${context.iterationCount} 次操作")
// 控制循环行为:
// CONTINUABLE=继续, FINISHED=终止
if (shouldStop()) RepeatStatus.FINISHED
else RepeatStatus.CONTINUABLE
}
TIP
回调函数本质:RepeatCallback
是包含业务逻辑的 lambda,Spring Batch 自动管理其重复调用
🧩 三、关键组件解析
1. RepeatContext 上下文对象
kotlin
fun doInIteration(context: RepeatContext) {
// 存储跨迭代的共享数据
context.setAttribute("total", context.getAttribute("total") + 1)
// 获取父级迭代上下文(嵌套时有用)
val parent = context.parent
}
方法 | 用途 |
---|---|
setAttribute() | 存储迭代间共享数据 |
getAttribute() | 读取跨迭代数据 |
parent | 获取父级迭代上下文(支持嵌套) |
2. RepeatStatus 状态枚举
状态值 | 含义 | 使用场景示例 |
---|---|---|
CONTINUABLE | 继续下一次迭代 | 默认返回,持续执行任务 |
FINISHED | 终止迭代 | 达到业务目标时主动停止 |
⚠️ 状态组合规则:
FINISHED.and(CONTINUABLE) = FINISHED
任一状态为 FINISHED 即终止整个迭代链
🎯 四、完成策略 (CompletionPolicy)
控制迭代何时终止的内置决策器:
kotlin
// 自定义完成策略(如:当处理超过100条数据时停止)
class RecordCountPolicy : CompletionPolicy {
override fun isComplete(context: RepeatContext): Boolean {
return (context.getAttribute("recordCount") as Int) > 100
}
}
// 应用策略
template.setCompletionPolicy(RecordCountPolicy())
常用内置策略:
SimpleCompletionPolicy
:固定次数后停止TimeoutTerminationPolicy
:超时自动终止
🚨 五、异常处理机制
通过 ExceptionHandler
定制错误响应:
kotlin
template.setExceptionHandler { context, throwable ->
when (throwable) {
is DataFormatException -> {
// 特定异常忽略
log.warn("数据格式异常,跳过记录")
}
else -> throw throwable // 其他异常抛出
}
}
CAUTION
分布式环境注意:
使用 RethrowOnThresholdExceptionHandler
时,
设置 useParent=true
使异常计数在嵌套迭代间共享
🔊 六、监听器 (Listeners)
在迭代生命周期插入监控逻辑:
kotlin
class MetricsListener : RepeatListener {
override fun before(context: RepeatContext) {
// 迭代开始前执行(如初始化计时器)
}
override fun after(context: RepeatContext, result: RepeatStatus) {
// 每次迭代后执行(如记录指标)
}
}
// 注册监听器
template.registerListener(MetricsListener())
⚡ 七、并行处理加速
利用多线程提升批量任务效率:
kotlin
val parallelTemplate = TaskExecutorRepeatTemplate().apply {
// 使用线程池执行任务
setTaskExecutor(ThreadPoolTaskExecutor().apply {
corePoolSize = 4
initialize()
})
}
✨ 八、声明式迭代 (AOP 模式)
通过注解实现无侵入式重复控制:
kotlin
@Configuration
class DeclarativeRepeatConfig {
@Bean
fun messageProcessor(): MessageService {
val proxy = ProxyFactory(MessageService::class.java).apply {
// 目标对象
setTarget(DefaultMessageService())
// 添加拦截器
addAdvice(repeatInterceptor())
}
return proxy.getProxy() as MessageService
}
fun repeatInterceptor(): RepeatOperationsInterceptor {
return RepeatOperationsInterceptor().apply {
// 自定义重试模板
setRepeatTemplate(customRepeatTemplate())
}
}
fun customRepeatTemplate() = RepeatTemplate().apply {
setCompletionPolicy(SimpleCompletionPolicy(3))
}
}
// 业务接口
interface MessageService {
fun processMessage(payload: String)
}
✅ 声明式优势:
- 业务代码零污染
- 通过 AOP 动态添加重试能力
- 策略配置与业务逻辑解耦
关键总结 🚦
组件 | 核心作用 | 使用场景 |
---|---|---|
RepeatTemplate | 迭代执行引擎 | 所有批处理任务基础 |
CompletionPolicy | 终止条件决策器 | 控制循环次数/超时等 |
ExceptionHandler | 异常处理管道 | 定制错误恢复策略 |
声明式迭代 | AOP 无侵入扩展 | 消息处理/API 调用重试 |
⚡ 性能提示:
对 I/O 密集型任务(如数据库读写)优先使用TaskExecutorRepeatTemplate
并行处理,可提升 300%+ 吞吐量
通过本指南,您已掌握 Spring Batch 重复操作的核心机制。接下来可继续探索 Spring Batch 并行处理 或 重试机制详解 等进阶主题!