Appearance
Spring Batch 配置 Step
详解教程
什么是 Spring Batch 的 Step
? 🧩
在 Spring Batch 中,Step
是批处理作业的核心构建块,代表作业中的一个独立、顺序执行的处理阶段。每个 Step
包含处理逻辑所需的所有配置信息,开发者可以根据需求设计简单或复杂的处理流程:
关键特性
- 独立性:每个 Step 拥有自己的处理逻辑和状态管理
- 顺序性:Step 按配置顺序依次执行
- 可配置性:支持事务控制、错误处理、跳过机制等
- 可组合性:多个 Step 可组合成复杂工作流
TIP
Step
就像工厂的生产线,每个工位(Step)负责特定工序,共同完成产品(批处理作业)的生产
配置 Step
的四种核心方式
1. 面向块的块处理 (Chunk-oriented Processing)
最常用的处理模式,适用于大数据量处理。数据被分成固定大小的"块"进行处理,每处理完一个块就提交事务。
kotlin
@Bean
fun chunkStep(jobRepository: JobRepository, transactionManager: PlatformTransactionManager): Step {
return StepBuilder("userImportStep", jobRepository)
.chunk<User, User>(100, transactionManager) // [!code highlight] // 每100条数据提交一次
.reader(flatFileItemReader())
.processor(userProcessor())
.writer(jdbcBatchItemWriter())
.faultTolerant()
.skipLimit(10) // 最多允许跳过10条错误数据
.skip(ValidationException::class.java) // [!code warning] // 注意:跳过特定异常
.build()
}
// 文件读取器示例
fun flatFileItemReader(): FlatFileItemReader<User> {
return FlatFileItemReaderBuilder<User>()
.name("userReader")
.resource(ClassPathResource("users.csv"))
.delimited()
.names("id", "name", "email")
.fieldSetMapper { fieldSet ->
User(
id = fieldSet.readInt("id"),
name = fieldSet.readString("name"),
email = fieldSet.readString("email")
)
}
.build()
}
块处理工作流程详解
CAUTION
块大小需要根据数据量和系统资源平衡选择:过小会导致频繁提交降低性能,过大会增加内存压力
2. 使用 TaskletStep
实现自定义逻辑
适合非标准数据处理场景,如文件清理、调用外部服务等原子操作。
kotlin
@Bean
fun cleanupStep(jobRepository: JobRepository): Step {
return StepBuilder("tempCleanupStep", jobRepository)
.tasklet(tempCleanupTasklet(), transactionManager)
.build()
}
// 自定义Tasklet实现
@Component
class TempCleanupTasklet : Tasklet {
override fun execute(contribution: StepContribution, chunkContext: ChunkContext): RepeatStatus {
val tempDir = File(System.getProperty("java.io.tmpdir"))
tempDir.listFiles()?.forEach { file ->
if (file.isFile && file.name.startsWith("batch_temp_")) {
if (!file.delete()) { // [!code error] // 删除失败处理
logger.error("Failed to delete temp file: ${file.absolutePath}")
}
}
}
return RepeatStatus.FINISHED
}
}
适用场景
✅ 单次执行的操作
✅ 不需要分块处理的场景
✅ 系统维护任务(如清理、备份)
3. 控制步骤流程控制
实现条件跳转和决策逻辑,构建灵活的工作流。
kotlin
@Bean
fun jobFlowStep(jobRepository: JobRepository): Step {
return StepBuilder("decisionStep", jobRepository)
.partitioner("fileProcessingPartitioner", filePartitioner())
.step(fileProcessingStep())
.gridSize(4) // 使用4个线程并行处理
.build()
}
// 条件跳转示例
@Bean
fun conditionalSteps(): Job {
return JobBuilder("conditionalJob", jobRepository)
.start(stepA())
.next(decision())
.from(decision())
.on("FAILED").to(stepB()) // [!code highlight] // 条件路由
.on("*").to(stepC())
.end()
.build()
}
// 决策器实现
@Bean
fun decision(): JobExecutionDecider {
return { execution, _ ->
val exitStatus = execution.stepExecutions
.find { it.stepName == "stepA" }
?.exitStatus ?: ExitStatus.FAILED
FlowExecutionStatus(if (exitStatus == ExitStatus.COMPLETED) "COMPLETED" else "FAILED")
}
}
注意事项
⚠️ 复杂流程可能导致状态管理困难
⚠️ 确保所有分支都有明确的结束点
⚠️ 避免循环跳转导致无限循环
4. 延迟绑定 Job
和 Step
属性
实现运行时动态配置,提高作业灵活性。
kotlin
@Bean
@StepScope // [!code highlight] // 关键注解
fun reader(
@Value("#{jobParameters['input.file']}") file: String
): FlatFileItemReader<User> {
return FlatFileItemReaderBuilder<User>()
.name("userReader")
.resource(FileSystemResource(file)) // 动态文件路径
// ...其他配置
}
// 使用环境变量配置
@Bean
@StepScope
fun processor(
@Value("#{jobExecutionContext['processing.mode']}") mode: String
): ItemProcessor<User, User> {
return when (mode) {
"VALIDATE" -> ValidationProcessor()
"ENRICH" -> EnrichmentProcessor()
else -> DefaultProcessor()
}
}
kotlin
@Bean
@StepScope
fun writer(
@Value("#{jobParameters['db.table']}") table: String
): JdbcBatchItemWriter<User> {
// 根据运行时参数动态选择写入表
}
kotlin
@Bean
fun writer(): JdbcBatchItemWriter<User> {
// 表名硬编码在代码中
writer.setSql("INSERT INTO fixed_table ...")
}
IMPORTANT
使用 @StepScope
时,Spring 会为每个 Step 执行创建新的 Bean 实例,避免状态冲突
最佳实践与常见陷阱
✅ 推荐实践
kotlin
// 1. 使用监听器增强监控
stepBuilder.listener(StepExecutionListener {
beforeStep = {
logger.info("Starting step: ${it.stepName}")
}
})
// 2. 合理配置跳过策略
.faultTolerant()
.skip(DataIntegrityViolationException::class.java)
.skip(ValidationException::class.java)
.skipLimit(100)
// 3. 设置重试机制
.retry(DeadlockLoserDataAccessException::class.java)
.retryLimit(3)
❌ 避免这些错误
kotlin
// 错误1:未设置事务管理器
.chunk<User, User>(100) // [!code error] // 缺少TransactionManager参数
// 错误2:在Processor中修改原始数据
override fun process(item: User): User {
item.name = item.name.uppercase() // [!code error] // 修改了原始对象
return item.copy(name = item.name.uppercase()) // ✅ 正确做法
}
// 错误3:未处理上下文清理
@Bean
@StepScope
fun resourceHolder(): ResourceHolder {
return ResourceHolder().apply {
// 未实现DisposableBean导致资源泄漏
}
}
关键警告
- 事务边界不清晰:确保每个事务包含完整的业务操作
- 状态管理混乱:避免在 ItemProcessor 中保存状态
- 资源泄漏:所有 @StepScope Bean 必须实现清理逻辑
总结回顾
配置方式 | 适用场景 | 关键注解/API |
---|---|---|
块处理 | 大数据量处理 | .chunk() + ItemReader/Processor/Writer |
Tasklet | 原子操作 | Tasklet 接口 + .tasklet() |
流程控制 | 复杂工作流 | .next() + .on() + JobExecutionDecider |
延迟绑定 | 运行时动态配置 | @StepScope + @Value("#{...}") |
通过本教程,您应该能够:
- 👉 理解
Step
在 Spring Batch 中的核心作用 - ⚡️ 掌握四种配置
Step
的实践方法 - 🛡️ 避免常见配置陷阱和性能问题
- 🔧 根据业务需求选择合适的处理模式
NOTE
实际应用中,建议结合 Spring Batch Admin 或监控系统实时跟踪 Step 执行状态,及时发现问题优化处理流程