Skip to content

Spring Batch 配置 Step 详解教程

什么是 Spring Batch 的 Step? 🧩

在 Spring Batch 中,Step 是批处理作业的核心构建块,代表作业中的一个独立、顺序执行的处理阶段。每个 Step 包含处理逻辑所需的所有配置信息,开发者可以根据需求设计简单或复杂的处理流程:

关键特性

  • 独立性:每个 Step 拥有自己的处理逻辑和状态管理
  • 顺序性:Step 按配置顺序依次执行
  • 可配置性:支持事务控制、错误处理、跳过机制等
  • 可组合性:多个 Step 可组合成复杂工作流

TIP

Step 就像工厂的生产线,每个工位(Step)负责特定工序,共同完成产品(批处理作业)的生产


配置 Step 的四种核心方式

1. 面向块的块处理 (Chunk-oriented Processing)

最常用的处理模式,适用于大数据量处理。数据被分成固定大小的"块"进行处理,每处理完一个块就提交事务。

kotlin
@Bean
fun chunkStep(jobRepository: JobRepository, transactionManager: PlatformTransactionManager): Step {
    return StepBuilder("userImportStep", jobRepository)
        .chunk<User, User>(100, transactionManager)  // [!code highlight] // 每100条数据提交一次
        .reader(flatFileItemReader())
        .processor(userProcessor())
        .writer(jdbcBatchItemWriter())
        .faultTolerant()
        .skipLimit(10)  // 最多允许跳过10条错误数据
        .skip(ValidationException::class.java)  // [!code warning] // 注意:跳过特定异常
        .build()
}

// 文件读取器示例
fun flatFileItemReader(): FlatFileItemReader<User> {
    return FlatFileItemReaderBuilder<User>()
        .name("userReader")
        .resource(ClassPathResource("users.csv"))
        .delimited()
        .names("id", "name", "email")
        .fieldSetMapper { fieldSet ->
            User(
                id = fieldSet.readInt("id"),
                name = fieldSet.readString("name"),
                email = fieldSet.readString("email")
            )
        }
        .build()
}
块处理工作流程详解

CAUTION

块大小需要根据数据量和系统资源平衡选择:过小会导致频繁提交降低性能,过大会增加内存压力


2. 使用 TaskletStep 实现自定义逻辑

适合非标准数据处理场景,如文件清理、调用外部服务等原子操作。

kotlin
@Bean
fun cleanupStep(jobRepository: JobRepository): Step {
    return StepBuilder("tempCleanupStep", jobRepository)
        .tasklet(tempCleanupTasklet(), transactionManager)  
        .build()
}

// 自定义Tasklet实现
@Component
class TempCleanupTasklet : Tasklet {
    override fun execute(contribution: StepContribution, chunkContext: ChunkContext): RepeatStatus {
        val tempDir = File(System.getProperty("java.io.tmpdir"))
        
        tempDir.listFiles()?.forEach { file ->
            if (file.isFile && file.name.startsWith("batch_temp_")) {
                if (!file.delete()) {  // [!code error] // 删除失败处理
                    logger.error("Failed to delete temp file: ${file.absolutePath}")
                }
            }
        }
        
        return RepeatStatus.FINISHED
    }
}

适用场景

✅ 单次执行的操作
✅ 不需要分块处理的场景
✅ 系统维护任务(如清理、备份)


3. 控制步骤流程控制

实现条件跳转决策逻辑,构建灵活的工作流。

kotlin
@Bean
fun jobFlowStep(jobRepository: JobRepository): Step {
    return StepBuilder("decisionStep", jobRepository)
        .partitioner("fileProcessingPartitioner", filePartitioner())
        .step(fileProcessingStep())
        .gridSize(4)  // 使用4个线程并行处理
        .build()
}

// 条件跳转示例
@Bean
fun conditionalSteps(): Job {
    return JobBuilder("conditionalJob", jobRepository)
        .start(stepA())
        .next(decision())
        .from(decision())
            .on("FAILED").to(stepB())  // [!code highlight] // 条件路由
            .on("*").to(stepC())
        .end()
        .build()
}

// 决策器实现
@Bean
fun decision(): JobExecutionDecider {
    return { execution, _ ->
        val exitStatus = execution.stepExecutions
            .find { it.stepName == "stepA" }
            ?.exitStatus ?: ExitStatus.FAILED
        
        FlowExecutionStatus(if (exitStatus == ExitStatus.COMPLETED) "COMPLETED" else "FAILED")
    }
}

注意事项

⚠️ 复杂流程可能导致状态管理困难
⚠️ 确保所有分支都有明确的结束点
⚠️ 避免循环跳转导致无限循环


4. 延迟绑定 JobStep 属性

实现运行时动态配置,提高作业灵活性。

kotlin
@Bean
@StepScope  // [!code highlight] // 关键注解
fun reader(
    @Value("#{jobParameters['input.file']}") file: String
): FlatFileItemReader<User> {
    return FlatFileItemReaderBuilder<User>()
        .name("userReader")
        .resource(FileSystemResource(file))  // 动态文件路径
        // ...其他配置
}

// 使用环境变量配置
@Bean
@StepScope
fun processor(
    @Value("#{jobExecutionContext['processing.mode']}") mode: String
): ItemProcessor<User, User> {
    return when (mode) {
        "VALIDATE" -> ValidationProcessor()
        "ENRICH" -> EnrichmentProcessor()
        else -> DefaultProcessor()
    }
}
kotlin
@Bean
@StepScope
fun writer(
    @Value("#{jobParameters['db.table']}") table: String
): JdbcBatchItemWriter<User> {
    // 根据运行时参数动态选择写入表
}
kotlin
@Bean
fun writer(): JdbcBatchItemWriter<User> {
    // 表名硬编码在代码中
    writer.setSql("INSERT INTO fixed_table ...")
}

IMPORTANT

使用 @StepScope 时,Spring 会为每个 Step 执行创建新的 Bean 实例,避免状态冲突


最佳实践与常见陷阱

✅ 推荐实践

kotlin
// 1. 使用监听器增强监控
stepBuilder.listener(StepExecutionListener { 
    beforeStep = { 
        logger.info("Starting step: ${it.stepName}") 
    }
})

// 2. 合理配置跳过策略
.faultTolerant()
.skip(DataIntegrityViolationException::class.java)
.skip(ValidationException::class.java)
.skipLimit(100)  

// 3. 设置重试机制
.retry(DeadlockLoserDataAccessException::class.java)
.retryLimit(3)

❌ 避免这些错误

kotlin
// 错误1:未设置事务管理器
.chunk<User, User>(100) // [!code error] // 缺少TransactionManager参数

// 错误2:在Processor中修改原始数据
override fun process(item: User): User {
    item.name = item.name.uppercase() // [!code error] // 修改了原始对象
    return item.copy(name = item.name.uppercase()) // ✅ 正确做法
}

// 错误3:未处理上下文清理
@Bean
@StepScope
fun resourceHolder(): ResourceHolder {
    return ResourceHolder().apply {
        // 未实现DisposableBean导致资源泄漏
    }
}

关键警告

  1. 事务边界不清晰:确保每个事务包含完整的业务操作
  2. 状态管理混乱:避免在 ItemProcessor 中保存状态
  3. 资源泄漏:所有 @StepScope Bean 必须实现清理逻辑

总结回顾

配置方式适用场景关键注解/API
块处理大数据量处理.chunk() + ItemReader/Processor/Writer
Tasklet原子操作Tasklet 接口 + .tasklet()
流程控制复杂工作流.next() + .on() + JobExecutionDecider
延迟绑定运行时动态配置@StepScope + @Value("#{...}")

通过本教程,您应该能够:

  1. 👉 理解 Step 在 Spring Batch 中的核心作用
  2. ⚡️ 掌握四种配置 Step 的实践方法
  3. 🛡️ 避免常见配置陷阱和性能问题
  4. 🔧 根据业务需求选择合适的处理模式

NOTE

实际应用中,建议结合 Spring Batch Admin 或监控系统实时跟踪 Step 执行状态,及时发现问题优化处理流程