Skip to content

Spring Batch 批处理框架

Spring Batch 是 Spring 生态系统中专门用于批量数据处理的强大框架。它提供了可重用的功能,用于处理大量记录,包括日志记录/跟踪、事务管理、作业处理统计、作业重启、跳过和资源管理等企业级批处理应用程序所必需的功能。

什么是 Spring Batch?

Spring Batch 是一个轻量级、全面的批处理框架,旨在支持开发对企业系统日常运营至关重要的强大批处理应用程序。Spring Batch 建立在人们期望的 Spring Framework 特性(生产力、基于 POJO 的开发方法和一般易用性)之上,同时使开发人员在必要时可以轻松访问和利用更高级的企业服务。

核心概念

Spring Batch 的设计围绕几个核心概念展开:

主要组件

  1. Job(作业):整个批处理过程的封装,由一个或多个 Step 组成
  2. Step(步骤):作业的独立、顺序阶段,包含实际的批处理逻辑
  3. ItemReader:从数据源读取数据的接口
  4. ItemProcessor:处理业务逻辑的接口
  5. ItemWriter:将处理后的数据写入目标的接口
  6. JobLauncher:启动作业的接口
  7. JobRepository:存储作业执行元数据的存储库

为什么选择 Spring Batch?

Spring Batch 特别适合以下场景:

  • 大量数据的定期批量处理
  • 并发批处理:作业的并行处理
  • 分阶段的企业消息驱动处理
  • 大规模并行批处理
  • 失败后手动或计划重启
  • 依赖步骤的顺序处理(使用工作流驱动的扩展)
  • 部分处理:跳过记录(例如,在回滚时)
  • 整个批次事务:适用于批次大小较小或现有存储过程/脚本的情况

实际业务场景

让我们通过几个实际的业务场景来理解 Spring Batch 的应用:

场景 1:银行对账单生成 💰

每月需要为所有客户生成银行对账单:

kotlin
@Configuration
@EnableBatchProcessing
class BankStatementJobConfig {

    @Bean
    fun bankStatementJob(
        jobBuilderFactory: JobBuilderFactory,
        bankStatementStep: Step
    ): Job {
        return jobBuilderFactory.get("bankStatementJob")
            .incrementer(RunIdIncrementer())
            .flow(bankStatementStep)
            .end()
            .build()
    }
}

场景 2:电商订单数据同步 🛒

定期将订单数据从主系统同步到数据仓库:

kotlin
@Component
class OrderProcessor : ItemProcessor<Order, OrderDto> {

    override fun process(item: Order): OrderDto? {
        // 业务逻辑:数据转换和验证
        return if (item.isValid()) {
            OrderDto(
                orderId = item.id,
                customerName = item.customer.name,
                totalAmount = item.calculateTotal(),
                processedDate = LocalDateTime.now()
            )
        } else {
            null // 跳过无效订单
        }
    }
}

场景 3:日志文件分析 📊

分析大量的访问日志文件并生成报告:

kotlin
@StepScope
@Bean
fun logFileReader(@Value("#{jobParameters['inputFile']}") inputFile: String): FlatFileItemReader<LogEntry> {
    return FlatFileItemReaderBuilder<LogEntry>()
        .name("logFileReader")
        .resource(FileSystemResource(inputFile))
        .delimited()
        .names("timestamp", "ip", "method", "url", "status", "size")
        .targetType(LogEntry::class.java)
        .build()
}

架构概览

Spring Batch 的架构基于分层设计:

基本配置

Maven 依赖

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

Gradle 依赖

kotlin
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-batch")
    implementation("org.springframework.boot:spring-boot-starter-data-jpa")
}

简单示例:数据导入任务

让我们创建一个简单的 CSV 文件导入数据库的批处理任务:

kotlin
@Configuration
@EnableBatchProcessing
class BatchConfig {

    @Bean
    fun csvToDataBaseJob(
        jobBuilderFactory: JobBuilderFactory,
        csvToDataBaseStep: Step
    ): Job {
        return jobBuilderFactory.get("csvToDataBaseJob")
            .incrementer(RunIdIncrementer())
            .start(csvToDataBaseStep)
            .build()
    }

    @Bean
    fun csvToDataBaseStep(
        stepBuilderFactory: StepBuilderFactory,
        csvReader: ItemReader<Customer>,
        customerProcessor: ItemProcessor<Customer, Customer>,
        databaseWriter: ItemWriter<Customer>
    ): Step {
        return stepBuilderFactory.get("csvToDataBaseStep")
            .<Customer, Customer>chunk(100)
            .reader(csvReader)
            .processor(customerProcessor)
            .writer(databaseWriter)
            .build()
    }
}
java
@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Bean
    public Job csvToDataBaseJob(JobBuilderFactory jobBuilderFactory,
                                Step csvToDataBaseStep) {
        return jobBuilderFactory.get("csvToDataBaseJob")
                .incrementer(new RunIdIncrementer())
                .start(csvToDataBaseStep)
                .build();
    }

    @Bean
    public Step csvToDataBaseStep(StepBuilderFactory stepBuilderFactory,
                                  ItemReader<Customer> csvReader,
                                  ItemProcessor<Customer, Customer> customerProcessor,
                                  ItemWriter<Customer> databaseWriter) {
        return stepBuilderFactory.get("csvToDataBaseStep")
                .<Customer, Customer>chunk(100)
                .reader(csvReader)
                .processor(customerProcessor)
                .writer(databaseWriter)
                .build();
    }
}

数据模型

kotlin
@Entity
@Table(name = "customers")
data class Customer(
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long = 0,

    @Column(name = "first_name")
    val firstName: String,

    @Column(name = "last_name")
    val lastName: String,

    @Column(name = "email")
    val email: String,

    @Column(name = "created_date")
    val createdDate: LocalDateTime = LocalDateTime.now()
)

CSV 读取器

kotlin
@Bean
@StepScope
fun csvReader(): FlatFileItemReader<Customer> {
    return FlatFileItemReaderBuilder<Customer>()
        .name("csvReader")
        .resource(ClassPathResource("customers.csv"))
        .delimited()
        .names("firstName", "lastName", "email")
        .targetType(Customer::class.java)
        .build()
}

数据处理器

kotlin
@Component
class CustomerProcessor : ItemProcessor<Customer, Customer> {

    private val logger = LoggerFactory.getLogger(CustomerProcessor::class.java)

    override fun process(item: Customer): Customer? {
        return if (isValidEmail(item.email)) {
            logger.info("处理客户: ${item.firstName} ${item.lastName}")
            item.copy(
                firstName = item.firstName.uppercase(),
                lastName = item.lastName.uppercase()
            )
        } else {
            logger.warn("跳过无效邮箱的客户: ${item.email}")
            null // 跳过无效数据
        }
    }

    private fun isValidEmail(email: String): Boolean {
        return email.contains("@") && email.contains(".")
    }
}

数据库写入器

kotlin
@Bean
fun databaseWriter(entityManagerFactory: EntityManagerFactory): JpaItemWriter<Customer> {
    return JpaItemWriterBuilder<Customer>()
        .entityManagerFactory(entityManagerFactory)
        .build()
}

监控和管理

Spring Batch 提供了丰富的监控功能:

Job 执行监听器

kotlin
@Component
class JobExecutionListener : JobExecutionListenerSupport() {

    private val logger = LoggerFactory.getLogger(JobExecutionListener::class.java)

    override fun beforeJob(jobExecution: JobExecution) {
        logger.info("作业开始执行: ${jobExecution.jobInstance.jobName}")
    }

    override fun afterJob(jobExecution: JobExecution) {
        val duration = Duration.between(
            jobExecution.startTime?.toInstant(),
            jobExecution.endTime?.toInstant()
        )

        logger.info("作业执行完成: ${jobExecution.jobInstance.jobName}")
        logger.info("执行状态: ${jobExecution.status}")
        logger.info("执行时长: ${duration.toMinutes()} 分钟")

        if (jobExecution.status == BatchStatus.FAILED) {
            logger.error("作业执行失败: ${jobExecution.allFailureExceptions}")
        }
    }
}

Step 执行监听器

kotlin
@Component
class StepExecutionListener : StepExecutionListenerSupport() {

    private val logger = LoggerFactory.getLogger(StepExecutionListener::class.java)

    override fun afterStep(stepExecution: StepExecution): ExitStatus? {
        logger.info("步骤 '${stepExecution.stepName}' 执行完成")
        logger.info("读取记录数: ${stepExecution.readCount}")
        logger.info("处理记录数: ${stepExecution.writeCount}")
        logger.info("跳过记录数: ${stepExecution.skipCount}")

        return null
    }
}

错误处理和重试

Spring Batch 提供了强大的错误处理机制:

配置重试和跳过

kotlin
@Bean
fun resilientStep(
    stepBuilderFactory: StepBuilderFactory,
    csvReader: ItemReader<Customer>,
    customerProcessor: ItemProcessor<Customer, Customer>,
    databaseWriter: ItemWriter<Customer>
): Step {
    return stepBuilderFactory.get("resilientStep")
        .<Customer, Customer>chunk(100)
        .reader(csvReader)
        .processor(customerProcessor)
        .writer(databaseWriter)
        // 重试配置
        .faultTolerant()
        .retry(DataAccessException::class.java)
        .retryLimit(3)
        // 跳过配置
        .skip(ValidationException::class.java)
        .skipLimit(10)
        .build()
}

自定义异常处理

kotlin
@Component
class CustomSkipPolicy : SkipPolicy {

    private val logger = LoggerFactory.getLogger(CustomSkipPolicy::class.java)

    override fun shouldSkip(t: Throwable, skipCount: Int): Boolean {
        return when {
            t is ValidationException -> {
                logger.warn("跳过验证异常 (第 ${skipCount + 1} 次): ${t.message}")
                skipCount < 10
            }
            t is DataIntegrityViolationException -> {
                logger.warn("跳过数据完整性异常: ${t.message}")
                true
            }
            else -> false
        }
    }
}

性能优化

并行处理

kotlin
@Bean
fun parallelStep(
    stepBuilderFactory: StepBuilderFactory,
    taskExecutor: TaskExecutor
): Step {
    return stepBuilderFactory.get("parallelStep")
        .<Customer, Customer>chunk(1000)
        .reader(csvReader())
        .processor(customerProcessor)
        .writer(databaseWriter)
        .taskExecutor(taskExecutor)
        .throttleLimit(4) // 并行线程数
        .build()
}

@Bean
fun taskExecutor(): TaskExecutor {
    val executor = ThreadPoolTaskExecutor()
    executor.corePoolSize = 4
    executor.maxPoolSize = 8
    executor.setQueueCapacity(100)
    executor.setThreadNamePrefix("batch-")
    executor.initialize()
    return executor
}

分区处理

kotlin
@Bean
fun partitionerStep(
    stepBuilderFactory: StepBuilderFactory,
    partitioner: Partitioner,
    workerStep: Step
): Step {
    return stepBuilderFactory.get("partitionerStep")
        .partitioner("workerStep", partitioner)
        .step(workerStep)
        .gridSize(4)
        .taskExecutor(taskExecutor())
        .build()
}

最佳实践

性能优化建议

  1. 合理设置 chunk 大小(通常在 100-1000 之间)
  2. 使用合适的事务边界
  3. 考虑使用分区处理大数据集
  4. 监控内存使用情况
  5. 为长时间运行的任务配置适当的超时

常见陷阱

  • 避免在 processor 中进行数据库操作
  • 注意 Step Scope 的正确使用
  • 小心处理大对象,可能导致内存溢出
  • 确保数据库连接池配置合理

总结

Spring Batch 是一个功能强大的批处理框架,提供了:

  • 可靠性:事务管理、错误处理、重试机制
  • 可扩展性:分区、并行处理、远程执行
  • 可维护性:清晰的架构、丰富的监控功能
  • 灵活性:支持多种数据源和目标

通过合理使用 Spring Batch,可以构建高效、稳定的企业级批处理应用程序,处理大规模数据处理任务。

TIP

在实际项目中,建议先从简单的批处理任务开始,逐步添加错误处理、监控和性能优化功能。Spring Batch 的学习曲线相对平缓,但要充分利用其强大功能需要深入理解其核心概念和最佳实践。