Skip to content

Spring Batch 重复操作机制指南 🌀

本文用通俗方式解析 Spring Batch 的核心重复执行框架,通过可视化图表+实战代码演示批处理任务的迭代控制


🔄 一、重复操作的核心价值

批处理的核心是重复执行相同逻辑(如数据处理/文件读写)。Spring Batch 通过 RepeatOperations 接口提供标准化迭代框架:


⚙️ 二、RepeatTemplate 实战

控制重复执行的核心引擎,通过回调函数执行业务逻辑:

kotlin
val template = RepeatTemplate().apply {
    // 设置最大执行次数为5
    setCompletionPolicy(SimpleCompletionPolicy(5))
}

template.iterate { context ->
    // 业务逻辑(如读取文件/处理数据)
    println("执行第 ${context.iterationCount} 次操作")

    // 控制循环行为:
    // CONTINUABLE=继续, FINISHED=终止
    if (shouldStop()) RepeatStatus.FINISHED
    else RepeatStatus.CONTINUABLE
}

TIP

回调函数本质
RepeatCallback 是包含业务逻辑的 lambda,Spring Batch 自动管理其重复调用


🧩 三、关键组件解析

1. RepeatContext 上下文对象
kotlin
fun doInIteration(context: RepeatContext) {
    // 存储跨迭代的共享数据
    context.setAttribute("total", context.getAttribute("total") + 1)

    // 获取父级迭代上下文(嵌套时有用)
    val parent = context.parent
}
方法用途
setAttribute()存储迭代间共享数据
getAttribute()读取跨迭代数据
parent获取父级迭代上下文(支持嵌套)
2. RepeatStatus 状态枚举
状态值含义使用场景示例
CONTINUABLE继续下一次迭代默认返回,持续执行任务
FINISHED终止迭代达到业务目标时主动停止

⚠️ 状态组合规则
FINISHED.and(CONTINUABLE) = FINISHED
任一状态为 FINISHED 即终止整个迭代链


🎯 四、完成策略 (CompletionPolicy)

控制迭代何时终止的内置决策器:

kotlin
// 自定义完成策略(如:当处理超过100条数据时停止)
class RecordCountPolicy : CompletionPolicy {
    override fun isComplete(context: RepeatContext): Boolean {
        return (context.getAttribute("recordCount") as Int) > 100
    }
}

// 应用策略
template.setCompletionPolicy(RecordCountPolicy())

常用内置策略:

  • SimpleCompletionPolicy:固定次数后停止
  • TimeoutTerminationPolicy:超时自动终止

🚨 五、异常处理机制

通过 ExceptionHandler 定制错误响应:

kotlin
template.setExceptionHandler { context, throwable ->
    when (throwable) {
        is DataFormatException -> {
            // 特定异常忽略
            log.warn("数据格式异常,跳过记录")
        }
        else -> throw throwable // 其他异常抛出
    }
}

CAUTION

分布式环境注意
使用 RethrowOnThresholdExceptionHandler 时,
设置 useParent=true 使异常计数在嵌套迭代间共享


🔊 六、监听器 (Listeners)

在迭代生命周期插入监控逻辑:

kotlin
class MetricsListener : RepeatListener {
    override fun before(context: RepeatContext) {
        // 迭代开始前执行(如初始化计时器)
    }

    override fun after(context: RepeatContext, result: RepeatStatus) {
        // 每次迭代后执行(如记录指标)
    }
}

// 注册监听器
template.registerListener(MetricsListener())

⚡ 七、并行处理加速

利用多线程提升批量任务效率:

kotlin
val parallelTemplate = TaskExecutorRepeatTemplate().apply {
    // 使用线程池执行任务
    setTaskExecutor(ThreadPoolTaskExecutor().apply {
        corePoolSize = 4
        initialize()
    })
}

✨ 八、声明式迭代 (AOP 模式)

通过注解实现无侵入式重复控制:

kotlin
@Configuration
class DeclarativeRepeatConfig {

    @Bean
    fun messageProcessor(): MessageService {
        val proxy = ProxyFactory(MessageService::class.java).apply {
            // 目标对象
            setTarget(DefaultMessageService())
            // 添加拦截器
            addAdvice(repeatInterceptor())
        }
        return proxy.getProxy() as MessageService
    }

    fun repeatInterceptor(): RepeatOperationsInterceptor {
        return RepeatOperationsInterceptor().apply {
            // 自定义重试模板
            setRepeatTemplate(customRepeatTemplate())
        }
    }

    fun customRepeatTemplate() = RepeatTemplate().apply {
        setCompletionPolicy(SimpleCompletionPolicy(3))
    }
}

// 业务接口
interface MessageService {
    fun processMessage(payload: String)
}

声明式优势

  1. 业务代码零污染
  2. 通过 AOP 动态添加重试能力
  3. 策略配置与业务逻辑解耦

关键总结 🚦

组件核心作用使用场景
RepeatTemplate迭代执行引擎所有批处理任务基础
CompletionPolicy终止条件决策器控制循环次数/超时等
ExceptionHandler异常处理管道定制错误恢复策略
声明式迭代AOP 无侵入扩展消息处理/API 调用重试

性能提示
对 I/O 密集型任务(如数据库读写)优先使用 TaskExecutorRepeatTemplate 并行处理,可提升 300%+ 吞吐量

通过本指南,您已掌握 Spring Batch 重复操作的核心机制。接下来可继续探索 Spring Batch 并行处理重试机制详解 等进阶主题!