Appearance
Spring Batch 批处理框架
Spring Batch 是 Spring 生态系统中专门用于批量数据处理的强大框架。它提供了可重用的功能,用于处理大量记录,包括日志记录/跟踪、事务管理、作业处理统计、作业重启、跳过和资源管理等企业级批处理应用程序所必需的功能。
什么是 Spring Batch?
Spring Batch 是一个轻量级、全面的批处理框架,旨在支持开发对企业系统日常运营至关重要的强大批处理应用程序。Spring Batch 建立在人们期望的 Spring Framework 特性(生产力、基于 POJO 的开发方法和一般易用性)之上,同时使开发人员在必要时可以轻松访问和利用更高级的企业服务。
核心概念
Spring Batch 的设计围绕几个核心概念展开:
主要组件
- Job(作业):整个批处理过程的封装,由一个或多个 Step 组成
- Step(步骤):作业的独立、顺序阶段,包含实际的批处理逻辑
- ItemReader:从数据源读取数据的接口
- ItemProcessor:处理业务逻辑的接口
- ItemWriter:将处理后的数据写入目标的接口
- JobLauncher:启动作业的接口
- JobRepository:存储作业执行元数据的存储库
为什么选择 Spring Batch?
Spring Batch 特别适合以下场景:
- 大量数据的定期批量处理
- 并发批处理:作业的并行处理
- 分阶段的企业消息驱动处理
- 大规模并行批处理
- 失败后手动或计划重启
- 依赖步骤的顺序处理(使用工作流驱动的扩展)
- 部分处理:跳过记录(例如,在回滚时)
- 整个批次事务:适用于批次大小较小或现有存储过程/脚本的情况
实际业务场景
让我们通过几个实际的业务场景来理解 Spring Batch 的应用:
场景 1:银行对账单生成 💰
每月需要为所有客户生成银行对账单:
kotlin
@Configuration
@EnableBatchProcessing
class BankStatementJobConfig {
@Bean
fun bankStatementJob(
jobBuilderFactory: JobBuilderFactory,
bankStatementStep: Step
): Job {
return jobBuilderFactory.get("bankStatementJob")
.incrementer(RunIdIncrementer())
.flow(bankStatementStep)
.end()
.build()
}
}
场景 2:电商订单数据同步 🛒
定期将订单数据从主系统同步到数据仓库:
kotlin
@Component
class OrderProcessor : ItemProcessor<Order, OrderDto> {
override fun process(item: Order): OrderDto? {
// 业务逻辑:数据转换和验证
return if (item.isValid()) {
OrderDto(
orderId = item.id,
customerName = item.customer.name,
totalAmount = item.calculateTotal(),
processedDate = LocalDateTime.now()
)
} else {
null // 跳过无效订单
}
}
}
场景 3:日志文件分析 📊
分析大量的访问日志文件并生成报告:
kotlin
@StepScope
@Bean
fun logFileReader(@Value("#{jobParameters['inputFile']}") inputFile: String): FlatFileItemReader<LogEntry> {
return FlatFileItemReaderBuilder<LogEntry>()
.name("logFileReader")
.resource(FileSystemResource(inputFile))
.delimited()
.names("timestamp", "ip", "method", "url", "status", "size")
.targetType(LogEntry::class.java)
.build()
}
架构概览
Spring Batch 的架构基于分层设计:
基本配置
Maven 依赖
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
Gradle 依赖
kotlin
dependencies {
implementation("org.springframework.boot:spring-boot-starter-batch")
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
}
简单示例:数据导入任务
让我们创建一个简单的 CSV 文件导入数据库的批处理任务:
kotlin
@Configuration
@EnableBatchProcessing
class BatchConfig {
@Bean
fun csvToDataBaseJob(
jobBuilderFactory: JobBuilderFactory,
csvToDataBaseStep: Step
): Job {
return jobBuilderFactory.get("csvToDataBaseJob")
.incrementer(RunIdIncrementer())
.start(csvToDataBaseStep)
.build()
}
@Bean
fun csvToDataBaseStep(
stepBuilderFactory: StepBuilderFactory,
csvReader: ItemReader<Customer>,
customerProcessor: ItemProcessor<Customer, Customer>,
databaseWriter: ItemWriter<Customer>
): Step {
return stepBuilderFactory.get("csvToDataBaseStep")
.<Customer, Customer>chunk(100)
.reader(csvReader)
.processor(customerProcessor)
.writer(databaseWriter)
.build()
}
}
java
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Bean
public Job csvToDataBaseJob(JobBuilderFactory jobBuilderFactory,
Step csvToDataBaseStep) {
return jobBuilderFactory.get("csvToDataBaseJob")
.incrementer(new RunIdIncrementer())
.start(csvToDataBaseStep)
.build();
}
@Bean
public Step csvToDataBaseStep(StepBuilderFactory stepBuilderFactory,
ItemReader<Customer> csvReader,
ItemProcessor<Customer, Customer> customerProcessor,
ItemWriter<Customer> databaseWriter) {
return stepBuilderFactory.get("csvToDataBaseStep")
.<Customer, Customer>chunk(100)
.reader(csvReader)
.processor(customerProcessor)
.writer(databaseWriter)
.build();
}
}
数据模型
kotlin
@Entity
@Table(name = "customers")
data class Customer(
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long = 0,
@Column(name = "first_name")
val firstName: String,
@Column(name = "last_name")
val lastName: String,
@Column(name = "email")
val email: String,
@Column(name = "created_date")
val createdDate: LocalDateTime = LocalDateTime.now()
)
CSV 读取器
kotlin
@Bean
@StepScope
fun csvReader(): FlatFileItemReader<Customer> {
return FlatFileItemReaderBuilder<Customer>()
.name("csvReader")
.resource(ClassPathResource("customers.csv"))
.delimited()
.names("firstName", "lastName", "email")
.targetType(Customer::class.java)
.build()
}
数据处理器
kotlin
@Component
class CustomerProcessor : ItemProcessor<Customer, Customer> {
private val logger = LoggerFactory.getLogger(CustomerProcessor::class.java)
override fun process(item: Customer): Customer? {
return if (isValidEmail(item.email)) {
logger.info("处理客户: ${item.firstName} ${item.lastName}")
item.copy(
firstName = item.firstName.uppercase(),
lastName = item.lastName.uppercase()
)
} else {
logger.warn("跳过无效邮箱的客户: ${item.email}")
null // 跳过无效数据
}
}
private fun isValidEmail(email: String): Boolean {
return email.contains("@") && email.contains(".")
}
}
数据库写入器
kotlin
@Bean
fun databaseWriter(entityManagerFactory: EntityManagerFactory): JpaItemWriter<Customer> {
return JpaItemWriterBuilder<Customer>()
.entityManagerFactory(entityManagerFactory)
.build()
}
监控和管理
Spring Batch 提供了丰富的监控功能:
Job 执行监听器
kotlin
@Component
class JobExecutionListener : JobExecutionListenerSupport() {
private val logger = LoggerFactory.getLogger(JobExecutionListener::class.java)
override fun beforeJob(jobExecution: JobExecution) {
logger.info("作业开始执行: ${jobExecution.jobInstance.jobName}")
}
override fun afterJob(jobExecution: JobExecution) {
val duration = Duration.between(
jobExecution.startTime?.toInstant(),
jobExecution.endTime?.toInstant()
)
logger.info("作业执行完成: ${jobExecution.jobInstance.jobName}")
logger.info("执行状态: ${jobExecution.status}")
logger.info("执行时长: ${duration.toMinutes()} 分钟")
if (jobExecution.status == BatchStatus.FAILED) {
logger.error("作业执行失败: ${jobExecution.allFailureExceptions}")
}
}
}
Step 执行监听器
kotlin
@Component
class StepExecutionListener : StepExecutionListenerSupport() {
private val logger = LoggerFactory.getLogger(StepExecutionListener::class.java)
override fun afterStep(stepExecution: StepExecution): ExitStatus? {
logger.info("步骤 '${stepExecution.stepName}' 执行完成")
logger.info("读取记录数: ${stepExecution.readCount}")
logger.info("处理记录数: ${stepExecution.writeCount}")
logger.info("跳过记录数: ${stepExecution.skipCount}")
return null
}
}
错误处理和重试
Spring Batch 提供了强大的错误处理机制:
配置重试和跳过
kotlin
@Bean
fun resilientStep(
stepBuilderFactory: StepBuilderFactory,
csvReader: ItemReader<Customer>,
customerProcessor: ItemProcessor<Customer, Customer>,
databaseWriter: ItemWriter<Customer>
): Step {
return stepBuilderFactory.get("resilientStep")
.<Customer, Customer>chunk(100)
.reader(csvReader)
.processor(customerProcessor)
.writer(databaseWriter)
// 重试配置
.faultTolerant()
.retry(DataAccessException::class.java)
.retryLimit(3)
// 跳过配置
.skip(ValidationException::class.java)
.skipLimit(10)
.build()
}
自定义异常处理
kotlin
@Component
class CustomSkipPolicy : SkipPolicy {
private val logger = LoggerFactory.getLogger(CustomSkipPolicy::class.java)
override fun shouldSkip(t: Throwable, skipCount: Int): Boolean {
return when {
t is ValidationException -> {
logger.warn("跳过验证异常 (第 ${skipCount + 1} 次): ${t.message}")
skipCount < 10
}
t is DataIntegrityViolationException -> {
logger.warn("跳过数据完整性异常: ${t.message}")
true
}
else -> false
}
}
}
性能优化
并行处理
kotlin
@Bean
fun parallelStep(
stepBuilderFactory: StepBuilderFactory,
taskExecutor: TaskExecutor
): Step {
return stepBuilderFactory.get("parallelStep")
.<Customer, Customer>chunk(1000)
.reader(csvReader())
.processor(customerProcessor)
.writer(databaseWriter)
.taskExecutor(taskExecutor)
.throttleLimit(4) // 并行线程数
.build()
}
@Bean
fun taskExecutor(): TaskExecutor {
val executor = ThreadPoolTaskExecutor()
executor.corePoolSize = 4
executor.maxPoolSize = 8
executor.setQueueCapacity(100)
executor.setThreadNamePrefix("batch-")
executor.initialize()
return executor
}
分区处理
kotlin
@Bean
fun partitionerStep(
stepBuilderFactory: StepBuilderFactory,
partitioner: Partitioner,
workerStep: Step
): Step {
return stepBuilderFactory.get("partitionerStep")
.partitioner("workerStep", partitioner)
.step(workerStep)
.gridSize(4)
.taskExecutor(taskExecutor())
.build()
}
最佳实践
性能优化建议
- 合理设置 chunk 大小(通常在 100-1000 之间)
- 使用合适的事务边界
- 考虑使用分区处理大数据集
- 监控内存使用情况
- 为长时间运行的任务配置适当的超时
常见陷阱
- 避免在 processor 中进行数据库操作
- 注意 Step Scope 的正确使用
- 小心处理大对象,可能导致内存溢出
- 确保数据库连接池配置合理
总结
Spring Batch 是一个功能强大的批处理框架,提供了:
- 可靠性:事务管理、错误处理、重试机制
- 可扩展性:分区、并行处理、远程执行
- 可维护性:清晰的架构、丰富的监控功能
- 灵活性:支持多种数据源和目标
通过合理使用 Spring Batch,可以构建高效、稳定的企业级批处理应用程序,处理大规模数据处理任务。
TIP
在实际项目中,建议先从简单的批处理任务开始,逐步添加错误处理、监控和性能优化功能。Spring Batch 的学习曲线相对平缓,但要充分利用其强大功能需要深入理解其核心概念和最佳实践。