Skip to content

Spring Batch 5.2 新特性详解

引言

Spring Batch 5.2 带来了多项重要改进,让批处理开发更高效灵活!无论你是处理大数据导入、报表生成还是系统间数据同步,本次更新都有惊喜。本教程将用通俗易懂的方式解读每个新特性,配合 Kotlin 代码示例,助你快速掌握升级要点。

🚀 核心新特性

1. MongoDB 作业仓库支持

解决的问题:终于支持 NoSQL 存储作业元数据了!不再局限于关系型数据库。

kotlin
@Configuration
class MongoJobConfig {

    @Bean
    fun jobRepository(
        mongoTemplate: MongoTemplate,
        transactionManager: MongoTransactionManager
    ): JobRepository {
        val factory = MongoJobRepositoryFactoryBean().apply {
            setMongoOperations(mongoTemplate)
            setTransactionManager(transactionManager)
            afterPropertiesSet()  
        }
        return factory.getObject()
    }
}

使用场景

  • 当你的系统已使用 MongoDB
  • 需要避免维护两种数据库类型
  • 处理非结构化元数据时

IMPORTANT

要求 MongoDB 4.0+ 版本,确保添加 Spring Data MongoDB 依赖

2. 轻量级无资源作业仓库

解决的问题:简单任务不想配数据库?这个轻量级方案来了!

kotlin
@Bean
fun resourcelessJobRepository(): JobRepository {
    return ResourcelessJobRepository().apply {
        transactionManager = ResourcelessTransactionManager()  
    }
}

适用场景

  • 仅适用于无需重启的任务
  • 执行上下文不参与数据传输
  • 单次运行的独立任务

CAUTION

不支持分布式环境!线程不安全!

3. 组合式条目读取器

解决的问题:多数据源合并读取不再需要自定义代码!

kotlin
@Bean
fun compositePersonReader(): CompositeItemReader<Person> {
    val fileReader = FlatFileItemReaderBuilder<Person>()
        .name("fileReader")
        .resource(ClassPathResource("persons.csv"))
        .delimited()
        .names("id", "name")
        .targetType(Person::class.java)
        .build()
    
    val dbReader = JdbcCursorItemReaderBuilder<Person>()
        .name("dbReader")
        .dataSource(dataSource)
        .sql("SELECT id, name FROM persons")
        .rowMapper(BeanPropertyRowMapper(Person::class.java))
        .build()
    
    return CompositeItemReader(listOf(fileReader, dbReader))  
}

4. Java 函数式接口适配器

解决的问题:用 Lambda 简化批处理组件开发

kotlin
@Bean
fun lambdaReader(): ItemReader<String> {
    return SupplierItemReader { 
        // 从任意来源生成数据
        listOf("A", "B", "C").iterator().next()
    }
}

@Bean
fun lambdaProcessor(): ItemProcessor<String, String> {
    return PredicateFilteringItemProcessor { item ->
        item != "B" // 过滤掉"B"
    }
}

@Bean
fun lambdaWriter(): ItemWriter<String> {
    return ConsumerItemWriter { items ->
        items.forEach { println("写入: $it") }  
    }
}

5. 阻塞队列并发处理

解决的问题:实现高效流水线式并发处理

kotlin
@Bean
fun concurrentJob(): Job {
    val queue = LinkedBlockingQueue<Data>(100)  

    return JobBuilder("concurrentJob", jobRepository)
        .start(producerStep(queue))
        .next(consumerStep(queue))
        .build()
}

@Bean
fun producerStep(queue: BlockingQueue<Data>): Step {
    return stepBuilderFactory.get("producerStep")
        .chunk<Data, Data>(10)
        .reader(fileReader())
        .writer(BlockingQueueItemWriter(queue))  
        .build()
}

@Bean
fun consumerStep(queue: BlockingQueue<Data>): Step {
    return stepBuilderFactory.get("consumerStep")
        .chunk<Data, Data>(10)
        .reader(BlockingQueueItemReader(queue))  
        .writer(databaseWriter())
        .build()
}

6. JPA 查询提示支持

解决的问题:优化 JPA 查询性能

kotlin
@Bean
fun jpaReader(entityManagerFactory: EntityManagerFactory): JpaPagingItemReader<Order> {
    return JpaPagingItemReaderBuilder<Order>()
        .name("orderReader")
        .entityManagerFactory(entityManagerFactory)
        .queryString("SELECT o FROM Order o WHERE o.status = 'NEW'")
        .parameterValues(mapOf("status" to "NEW"))
        .queryHints(mapOf(
            "javax.persistence.query.timeout" to 1000,  
            "org.hibernate.fetchSize" to 100
        ))
        .build()
}

7. JDBC 数据类支持

解决的问题:完美支持 Java Record 和 Kotlin 数据类

kotlin
// Kotlin 数据类
data class Person(val id: Int, val name: String)

@Bean
fun recordReader(dataSource: DataSource): JdbcCursorItemReader<Person> {
    return JdbcCursorItemReaderBuilder<Person>()
        .name("personReader")
        .dataSource(dataSource)
        .sql("SELECT id, name FROM persons")
        .dataRowMapper(Person::class.java)  
        .build()
}

8. 可配置行分隔符

解决的问题:灵活控制文件生成格式

kotlin
@Bean
fileWriter(): FlatFileItemWriter<Data> {
    val aggregator = RecursiveCollectionLineAggregator<Data>().apply {
        lineSeparator = "\r\n"  // 明确指定Windows换行
    }
    
    return FlatFileItemWriterBuilder<Data>()
        .name("fileWriter")
        .resource(FileSystemResource("output.txt"))
        .lineAggregator(aggregator)
        .build()
}

9. 作业注册改进

解决的问题:消除启动时的烦人警告日志

kotlin
// 旧方式 (已废弃)
// @Bean
// fun jobRegistryPostProcessor(registry: JobRegistry): JobRegistryBeanPostProcessor {
//     return JobRegistryBeanPostProcessor().apply {
//         setJobRegistry(registry)
//     }
// }

// 新方式 ✅
@Bean
fun jobRegistryInitializer(registry: JobRegistry, context: ApplicationContext): JobRegistrySmartInitializingSingleton {
    return JobRegistrySmartInitializingSingleton(registry, context)  
}

10. 依赖升级清单

Spring Batch 5.2 同步更新了关键依赖:

依赖项新版本重要性
Spring Framework6.2.0⭐⭐⭐⭐
Spring Data3.4.0⭐⭐⭐⭐
Spring Kafka3.3.0⭐⭐⭐
Micrometer1.14.1⭐⭐⭐
Spring AMQP3.2.0⭐⭐

升级建议

运行 ./mvnw dependency:tree 检查依赖冲突,特别注意:

gradle
// Gradle 推荐配置
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-batch")
    implementation("org.springframework.batch:spring-batch-integration:5.2.0")  // [!code highlight]
}

💡 最佳实践建议

组合新特性示例

点击查看完整配置示例
kotlin
@Configuration
@EnableBatchProcessing
class AdvancedBatchConfig {

    @Autowired lateinit var jobBuilderFactory: JobBuilderFactory
    @Autowired lateinit var stepBuilderFactory: StepBuilderFactory

    // 1. 使用MongoDB存储元数据
    @Bean
    fun mongoJobRepo(mongo: MongoTemplate, tx: MongoTransactionManager) =
        MongoJobRepositoryFactoryBean().apply {
            setMongoOperations(mongo)
            setTransactionManager(tx)
        }.getObject()

    // 2. 组合读取器 + 函数式处理
    @Bean
    fun customReader() = CompositeItemReader(listOf(dbReader(), apiReader()))

    @Bean
    fun filterProcessor() = PredicateFilteringItemProcessor<Data> { it.isValid }

    // 3. 并发步骤配置
    @Bean
    fun concurrentJob(): Job {
        val queue = LinkedBlockingQueue<Data>(500)
        return jobBuilderFactory.get("concurrentJob")
            .start(producerStep(queue))
            .next(consumerStep(queue))
            .build()
    }
    
    private fun producerStep(queue: BlockingQueue<Data>) = 
        stepBuilderFactory.get("producer")
            .chunk<Data, Data>(100)
            .reader(customReader())
            .writer(BlockingQueueItemWriter(queue))
            .build()
    
    private fun consumerStep(queue: BlockingQueue<Data>) = 
        stepBuilderFactory.get("consumer")
            .chunk<Data, Result>(50)
            .reader(BlockingQueueItemReader(queue))
            .processor(filterProcessor())
            .writer(mongoWriter())
            .build()
}

升级迁移指南

  1. 必须更改项

    kotlin
    // 5.1 旧方式
    // @Bean
    // fun postProcessor(registry: JobRegistry) = JobRegistryBeanPostProcessor(registry)
    
    // 5.2 新方式
    @Bean
    fun jobInitializer(registry: JobRegistry, ctx: ApplicationContext) = 
        JobRegistrySmartInitializingSingleton(registry, ctx)  
  2. 推荐改进项

    kotlin
    // 旧:普通类映射
    .beanRowMapper(Person::class.java)
    
    // 新:数据类专用映射
    .dataRowMapper(Person::class.java)  

结语

Spring Batch 5.2 通过 MongoDB 支持、函数式编程增强和并发处理优化,显著提升了批处理开发的灵活性和效率。建议重点尝试:

  1. 在 NoSQL 环境中使用 MongoDB 存储作业元数据
  2. CompositeItemReader 简化多数据源读取
  3. 通过阻塞队列实现高效流水线
  4. 将现有作业注册升级到 JobRegistrySmartInitializingSingleton

"好的工具让复杂任务变简单 —— Spring Batch 5.2 正是这样的工具" 🛠️💪