Skip to content

Spring Batch 领域语言指南 🧩

将复杂的批处理概念转化为清晰易懂的实践知识


🌟 核心概念全景图


🚀 批处理作业(Job)

批处理作业是最高层次的执行单元,相当于一个完整的业务流程容器。

关键特性:

  1. 多步骤组合:串联多个处理步骤(如:加载→处理→汇总)
  2. 全局配置:统一管理重启策略等公共配置
  3. 两种定义方式(Kotlin DSL 示例):
kotlin
@Bean
fun footballJob(jobRepository: JobRepository): Job {
    return JobBuilder("footballJob", jobRepository)
        .start(playerLoadStep())
        .next(gameLoadStep())
        .next(playerSummarizationStep())
        .build()
}
kotlin
@Configuration
class BatchConfig {
    @Bean
    fun job(jobRepository: JobRepository): Job {
        return job("footballJob", jobRepository) {
            step("playerLoad") { 
                tasklet { _, _ -> RepeatStatus.FINISHED }
            }
            step("gameLoad") { 
                tasklet { _, _ -> RepeatStatus.FINISHED }
            }
        }
    }
}

TIP

推荐使用 Kotlin DSL 配置,比 XML 更简洁且类型安全


📦 作业实例(JobInstance)

代表逻辑上的作业执行,由Job+JobParameters唯一确定:

场景JobInstance 数量说明
每日对账作业每日1个2023-01-01 vs 2023-01-02 不同
失败后重跑相同参数仍为1个视为同一次逻辑执行
kotlin
// 获取当前作业参数
jobExecution.jobParameters.getString("processDate")

⚙️ 作业执行(JobExecution)

记录单次物理执行的详细状态:

属性说明示例值
status执行状态STARTED, COMPLETED
startTime/endTime执行起止时间2023-01-01T09:00:00
exitStatus退出码(用于外部系统感知)COMPLETED_WITH_SKIPS
executionContext作业级共享数据统计信息/断点记录

CAUTION

相同JobInstance的多次执行会产生多个JobExecution记录


🧩 处理步骤(Step)

作业的最小执行单元,标准处理流程:

步骤执行(StepExecution)关键指标:

kotlin
stepExecution.run {
    println("已读: $readCount") 
    println("已写: $writeCount")
    println("跳过: $skipCount")
}

📦 执行上下文(ExecutionContext)

跨步骤/作业的数据共享容器,支持重启续作:

kotlin
// 写入位置状态(文件处理场景)
executionContext.putLong("file.position", reader.currentPosition)

// 重启时恢复位置
val lastPosition = executionContext.getLong("file.position")
reader.seek(lastPosition)
作用域生命周期典型用途
Job级整个作业运行期间全局计数器/汇总数据
Step级单步骤内有效文件读取位置/事务状态

IMPORTANT

存储对象必须实现Serializable接口,否则重启时无法恢复状态!


🏗️ 基础设施组件

1. JobRepository

批处理的元数据中心,自动记录:

  • 作业/步骤执行状态
  • 上下文数据持久化
  • 重启元数据管理
kotlin
@Configuration
@EnableBatchProcessing
class BatchConfig {
    // 自动配置JobRepository
}

2. JobLauncher

作业启动器,触发执行入口:

kotlin
jobLauncher.run(job, JobParametersBuilder()
    .addString("processDate", "20230101")
    .toJobParameters())

🔌 数据处理三剑客

1. ItemReader - 数据提取

kotlin
@Bean
fun reader(): ItemReader<User> {
    return JdbcCursorItemReader<User>().apply {
        setDataSource(dataSource)
        setSql("SELECT * FROM users")
        setRowMapper(BeanPropertyRowMapper(User::class.java))
    }
}

2. ItemProcessor - 业务处理

kotlin
class BonusCalculator : ItemProcessor<User, User>() {
    override fun process(user: User): User? {
        return user.takeIf { it.active }.apply { 
            bonus = salary * 0.1 
        }
    }
}

3. ItemWriter - 结果输出

kotlin
@Bean
fun writer(): ItemWriter<User> {
    return JdbcBatchItemWriter<User>().apply {
        setDataSource(dataSource)
        setSql("UPDATE users SET bonus = ? WHERE id = ?")
        setItemPreparedStatementSetter { user, ps ->
            ps.setBigDecimal(1, user.bonus)
            ps.setLong(2, user.id)
        }
    }
}

🚀 最佳实践建议

  1. 重启防护

    kotlin
    @Bean
    fun job(): Job {
        return jobBuilder("safeJob")
            .preventRestart() // 明确设置是否允许重启
            .start(step())
            .build()
    }
  2. 参数验证

    kotlin
    @BeforeStep
    fun validateParams(stepExecution: StepExecution) {
        require(stepExecution.jobParameters.getString("inputFile") != null) {
            "必须指定inputFile参数" 
        }
    }
  3. 监控指标暴露

    kotlin
    @AfterStep
    fun logMetrics(execution: StepExecution) {
        metrics.gauge("skip_count", execution.skipCount)
    }

TIP

使用@EnableBatchProcessing注解可自动配置90%的基础设施组件

通过以上结构化的领域概念解析,结合Kotlin DSL的现代配置方式,您可快速构建健壮的企业级批处理应用!🎯