Appearance
Spring Batch 领域语言指南 🧩
将复杂的批处理概念转化为清晰易懂的实践知识
🌟 核心概念全景图
🚀 批处理作业(Job)
批处理作业是最高层次的执行单元,相当于一个完整的业务流程容器。
关键特性:
- 多步骤组合:串联多个处理步骤(如:加载→处理→汇总)
- 全局配置:统一管理重启策略等公共配置
- 两种定义方式(Kotlin DSL 示例):
kotlin
@Bean
fun footballJob(jobRepository: JobRepository): Job {
return JobBuilder("footballJob", jobRepository)
.start(playerLoadStep())
.next(gameLoadStep())
.next(playerSummarizationStep())
.build()
}
kotlin
@Configuration
class BatchConfig {
@Bean
fun job(jobRepository: JobRepository): Job {
return job("footballJob", jobRepository) {
step("playerLoad") {
tasklet { _, _ -> RepeatStatus.FINISHED }
}
step("gameLoad") {
tasklet { _, _ -> RepeatStatus.FINISHED }
}
}
}
}
TIP
推荐使用 Kotlin DSL 配置,比 XML 更简洁且类型安全
📦 作业实例(JobInstance)
代表逻辑上的作业执行,由Job
+JobParameters
唯一确定:
场景 | JobInstance 数量 | 说明 |
---|---|---|
每日对账作业 | 每日1个 | 2023-01-01 vs 2023-01-02 不同 |
失败后重跑相同参数 | 仍为1个 | 视为同一次逻辑执行 |
kotlin
// 获取当前作业参数
jobExecution.jobParameters.getString("processDate")
⚙️ 作业执行(JobExecution)
记录单次物理执行的详细状态:
属性 | 说明 | 示例值 |
---|---|---|
status | 执行状态 | STARTED , COMPLETED |
startTime/endTime | 执行起止时间 | 2023-01-01T09:00:00 |
exitStatus | 退出码(用于外部系统感知) | COMPLETED_WITH_SKIPS |
executionContext | 作业级共享数据 | 统计信息/断点记录 |
CAUTION
相同JobInstance
的多次执行会产生多个JobExecution
记录
🧩 处理步骤(Step)
作业的最小执行单元,标准处理流程:
步骤执行(StepExecution)关键指标:
kotlin
stepExecution.run {
println("已读: $readCount")
println("已写: $writeCount")
println("跳过: $skipCount")
}
📦 执行上下文(ExecutionContext)
跨步骤/作业的数据共享容器,支持重启续作:
kotlin
// 写入位置状态(文件处理场景)
executionContext.putLong("file.position", reader.currentPosition)
// 重启时恢复位置
val lastPosition = executionContext.getLong("file.position")
reader.seek(lastPosition)
作用域 | 生命周期 | 典型用途 |
---|---|---|
Job级 | 整个作业运行期间 | 全局计数器/汇总数据 |
Step级 | 单步骤内有效 | 文件读取位置/事务状态 |
IMPORTANT
存储对象必须实现Serializable
接口,否则重启时无法恢复状态!
🏗️ 基础设施组件
1. JobRepository
批处理的元数据中心,自动记录:
- 作业/步骤执行状态
- 上下文数据持久化
- 重启元数据管理
kotlin
@Configuration
@EnableBatchProcessing
class BatchConfig {
// 自动配置JobRepository
}
2. JobLauncher
作业启动器,触发执行入口:
kotlin
jobLauncher.run(job, JobParametersBuilder()
.addString("processDate", "20230101")
.toJobParameters())
🔌 数据处理三剑客
1. ItemReader - 数据提取
kotlin
@Bean
fun reader(): ItemReader<User> {
return JdbcCursorItemReader<User>().apply {
setDataSource(dataSource)
setSql("SELECT * FROM users")
setRowMapper(BeanPropertyRowMapper(User::class.java))
}
}
2. ItemProcessor - 业务处理
kotlin
class BonusCalculator : ItemProcessor<User, User>() {
override fun process(user: User): User? {
return user.takeIf { it.active }.apply {
bonus = salary * 0.1
}
}
}
3. ItemWriter - 结果输出
kotlin
@Bean
fun writer(): ItemWriter<User> {
return JdbcBatchItemWriter<User>().apply {
setDataSource(dataSource)
setSql("UPDATE users SET bonus = ? WHERE id = ?")
setItemPreparedStatementSetter { user, ps ->
ps.setBigDecimal(1, user.bonus)
ps.setLong(2, user.id)
}
}
}
🚀 最佳实践建议
重启防护:
kotlin@Bean fun job(): Job { return jobBuilder("safeJob") .preventRestart() // 明确设置是否允许重启 .start(step()) .build() }
参数验证:
kotlin@BeforeStep fun validateParams(stepExecution: StepExecution) { require(stepExecution.jobParameters.getString("inputFile") != null) { "必须指定inputFile参数" } }
监控指标暴露:
kotlin@AfterStep fun logMetrics(execution: StepExecution) { metrics.gauge("skip_count", execution.skipCount) }
TIP
使用@EnableBatchProcessing
注解可自动配置90%的基础设施组件
通过以上结构化的领域概念解析,结合Kotlin DSL的现代配置方式,您可快速构建健壮的企业级批处理应用!🎯