Appearance
🌟 Spring Batch 步骤流程控制详解
本教程将深入讲解 Spring Batch 中步骤(Step)流程控制的机制与实现方式,通过通俗易懂的示例帮助您掌握作业流程编排的核心技巧。
一、流程控制基础
为什么需要控制步骤流程?
在 Spring Batch 中,作业(Job)由多个步骤(Step)组成,但这些步骤的执行并非总是线性的:
- 某个步骤失败不一定导致整个作业失败
- 步骤的成功可能有多种状态,需要不同处理
- 某些步骤可能根据条件被跳过不执行
TIP
理解步骤流程控制是构建复杂批处理作业的关键,它让您的作业具备智能决策能力,而不仅仅是简单的顺序执行。
核心概念说明
二、顺序流控制
基本顺序执行
这是最简单的流程 - 所有步骤按顺序依次执行:
kotlin
@Bean
fun job(jobRepository: JobRepository, stepA: Step, stepB: Step, stepC: Step): Job {
return JobBuilder("sequentialJob", jobRepository)
.start(stepA)
.next(stepB)
.next(stepC)
.build()
}
NOTE
在顺序流中:
- 如果
stepA
失败,整个作业立即失败,stepB
和stepC
不会执行 - 所有步骤必须成功作业才会成功
三、条件流控制
基本条件分支
根据步骤执行结果决定下一步路径:
kotlin
@Bean
fun conditionalJob(jobRepository: JobRepository, stepA: Step, stepB: Step, stepC: Step): Job {
return JobBuilder("conditionalJob", jobRepository)
.start(stepA)
.on("*").to(stepB)
.from(stepA)
.on("FAILED").to(stepC)
.end()
.build()
}
代码说明
on("*")
:匹配任意退出状态(除了明确指定的状态)on("FAILED")
:当退出状态为 FAILED 时触发to(stepC)
:转移到指定步骤
状态码匹配规则
Spring Batch 使用简单的模式匹配:
模式 | 说明 | 示例匹配 |
---|---|---|
* | 匹配零个或多个字符 | FAILED, COMPLETED |
? | 匹配单个字符 | A, B (不匹配 AB) |
C*T | 匹配C开头T结尾的状态 | COUNT, CAT |
C?T | 匹配三个字符C开头T结尾 | CAT (不匹配 COUNT) |
WARNING
每个步骤的所有可能退出状态必须被明确定义,否则框架会抛出异常导致作业失败!
批处理状态 vs 退出状态
状态类型 | 说明 |
---|---|
BatchStatus | 框架内部状态(COMPLETED, STARTED, FAILED 等) |
ExitStatus | 步骤执行后的自定义退出状态,决定流程走向的关键 |
kotlin
class SkipCheckingListener : StepExecutionListener {
override fun afterStep(stepExecution: StepExecution): ExitStatus? {
return if (stepExecution.skipCount > 0) {
ExitStatus("COMPLETED WITH SKIPS")
} else {
null
}
}
}
在作业配置中使用自定义状态:
kotlin
@Bean
fun customStatusJob(jobRepository: JobRepository, step1: Step, errorPrint: Step): Job {
return JobBuilder("customStatusJob", jobRepository)
.start(step1)
.on("FAILED").end()
.from(step1)
.on("COMPLETED WITH SKIPS").to(errorPrint)
.from(step1)
.on("*").to(step2)
.end()
.build()
}
四、配置停止行为
1. 正常结束作业
使用 end()
方法正常终止作业:
kotlin
@Bean
fun endJob(jobRepository: JobRepository, step1: Step, step2: Step): Job {
return JobBuilder("endJob", jobRepository)
.start(step1)
.next(step2)
.on("FAILED").end()
.from(step2).on("*").to(step3)
.end()
.build()
}
IMPORTANT
使用 end()
终止的作业状态为 COMPLETED,无法重新启动!
2. 使作业失败
使用 fail()
方法使作业失败:
kotlin
@Bean
fun failJob(jobRepository: JobRepository, step1: Step, step2: Step): Job {
return JobBuilder("failJob", jobRepository)
.start(step1)
.next(step2).on("FAILED").fail()
.from(step2).on("*").to(step3)
.end()
.build()
}
TIP
失败状态(FAILED)的作业可以重新启动,适合需要重试的场景。
3. 暂停作业
使用 stopAndRestart()
暂停作业:
kotlin
@Bean
fun stopJob(jobRepository: JobRepository, step1: Step, step2: Step): Job {
return JobBuilder("stopJob", jobRepository)
.start(step1)
.on("COMPLETED").stopAndRestart(step2)
.end()
.build()
}
暂停作业特点
- 状态为 STOPPED
- 可恢复执行(从指定步骤继续)
- 适合需要人工干预的场景
五、编程式流程决策
自定义决策器
当简单状态匹配无法满足需求时,可实现 JobExecutionDecider
:
kotlin
class CustomDecider : JobExecutionDecider {
override fun decide(
jobExecution: JobExecution,
stepExecution: StepExecution?
): FlowExecutionStatus {
return if (someComplexCondition()) {
FlowExecutionStatus("SPECIAL_CASE")
} else {
FlowExecutionStatus("NORMAL")
}
}
}
在作业配置中使用决策器:
kotlin
@Bean
fun deciderJob(
jobRepository: JobRepository,
decider: JobExecutionDecider,
stepA: Step,
stepB: Step
): Job {
return JobBuilder("deciderJob", jobRepository)
.start(stepA)
.next(decider).on("SPECIAL_CASE").to(stepB)
.from(decider).on("NORMAL").to(stepC)
.end()
.build()
}
六、分流处理
并行执行多个流程
使用 split()
实现并行执行:
kotlin
@Bean
fun flow1(step1: Step, step2: Step): Flow {
return FlowBuilder<SimpleFlow>("flow1")
.start(step1)
.next(step2)
.build()
}
@Bean
fun flow2(step3: Step): Flow {
return FlowBuilder<SimpleFlow>("flow2")
.start(step3)
.build()
}
@Bean
fun splitJob(
jobRepository: JobRepository,
flow1: Flow,
flow2: Flow,
finalStep: Step
): Job {
return JobBuilder("splitJob", jobRepository)
.start(flow1)
.split(SimpleAsyncTaskExecutor())
.add(flow2)
.next(finalStep)
.end()
.build()
}
CAUTION
并行流程需要谨慎设计资源竞争和事务边界问题!
七、外部化流程定义
1. 流程复用
将通用流程定义为独立Bean:
kotlin
@Bean
fun commonFlow(stepA: Step, stepB: Step): Flow {
return FlowBuilder<SimpleFlow>("commonFlow")
.start(stepA)
.next(stepB)
.build()
}
@Bean
fun jobWithExternalFlow(
jobRepository: JobRepository,
commonFlow: Flow,
stepC: Step
): Job {
return JobBuilder("externalFlowJob", jobRepository)
.start(commonFlow)
.next(stepC)
.end()
.build()
}
2. 作业嵌套(JobStep)
使用 JobStep
实现作业间的依赖:
kotlin
@Bean
fun parentJob(jobRepository: JobRepository, childJobStep: Step): Job {
return JobBuilder("parentJob", jobRepository)
.start(childJobStep)
.build()
}
@Bean
fun childJobStep(
jobRepository: JobRepository,
jobLauncher: JobLauncher,
childJob: Job
): Step {
return StepBuilder("childJobStep", jobRepository)
.job(childJob)
.launcher(jobLauncher)
.parametersExtractor { stepExecution ->
mapOf(
"parentId" to stepExecution.jobExecution.id.toString()
)
}
.build()
}
最佳实践
- 保持步骤单一职责:每个步骤只做一件事
- 合理设置事务边界:避免长事务影响性能
- 使用监听器增强控制:StepExecutionListener提供额外控制点
- 充分测试流程分支:确保所有分支逻辑都被覆盖
总结
通过本教程,您已经掌握了 Spring Batch 中控制步骤流程的核心技术:
✅ 顺序流 - 线性执行步骤链
✅ 条件流 - 基于状态码分支路由
✅ 停止控制 - 灵活结束/失败/暂停作业
✅ 编程决策 - 复杂逻辑的定制化处理
✅ 分流处理 - 并行执行提高效率
✅ 流程复用 - 模块化设计提高可维护性
⚡️ 立即应用这些技术,让您的批处理作业更加智能和健壮!