Appearance
Spring Batch 5.2 新特性详解
引言
Spring Batch 5.2 带来了多项重要改进,让批处理开发更高效灵活!无论你是处理大数据导入、报表生成还是系统间数据同步,本次更新都有惊喜。本教程将用通俗易懂的方式解读每个新特性,配合 Kotlin 代码示例,助你快速掌握升级要点。
🚀 核心新特性
1. MongoDB 作业仓库支持
解决的问题:终于支持 NoSQL 存储作业元数据了!不再局限于关系型数据库。
kotlin
@Configuration
class MongoJobConfig {
@Bean
fun jobRepository(
mongoTemplate: MongoTemplate,
transactionManager: MongoTransactionManager
): JobRepository {
val factory = MongoJobRepositoryFactoryBean().apply {
setMongoOperations(mongoTemplate)
setTransactionManager(transactionManager)
afterPropertiesSet()
}
return factory.getObject()
}
}
使用场景
- 当你的系统已使用 MongoDB
- 需要避免维护两种数据库类型
- 处理非结构化元数据时
IMPORTANT
要求 MongoDB 4.0+ 版本,确保添加 Spring Data MongoDB 依赖
2. 轻量级无资源作业仓库
解决的问题:简单任务不想配数据库?这个轻量级方案来了!
kotlin
@Bean
fun resourcelessJobRepository(): JobRepository {
return ResourcelessJobRepository().apply {
transactionManager = ResourcelessTransactionManager()
}
}
适用场景
- 仅适用于无需重启的任务
- 执行上下文不参与数据传输
- 单次运行的独立任务
CAUTION
不支持分布式环境!线程不安全!
3. 组合式条目读取器
解决的问题:多数据源合并读取不再需要自定义代码!
kotlin
@Bean
fun compositePersonReader(): CompositeItemReader<Person> {
val fileReader = FlatFileItemReaderBuilder<Person>()
.name("fileReader")
.resource(ClassPathResource("persons.csv"))
.delimited()
.names("id", "name")
.targetType(Person::class.java)
.build()
val dbReader = JdbcCursorItemReaderBuilder<Person>()
.name("dbReader")
.dataSource(dataSource)
.sql("SELECT id, name FROM persons")
.rowMapper(BeanPropertyRowMapper(Person::class.java))
.build()
return CompositeItemReader(listOf(fileReader, dbReader))
}
4. Java 函数式接口适配器
解决的问题:用 Lambda 简化批处理组件开发
kotlin
@Bean
fun lambdaReader(): ItemReader<String> {
return SupplierItemReader {
// 从任意来源生成数据
listOf("A", "B", "C").iterator().next()
}
}
@Bean
fun lambdaProcessor(): ItemProcessor<String, String> {
return PredicateFilteringItemProcessor { item ->
item != "B" // 过滤掉"B"
}
}
@Bean
fun lambdaWriter(): ItemWriter<String> {
return ConsumerItemWriter { items ->
items.forEach { println("写入: $it") }
}
}
5. 阻塞队列并发处理
解决的问题:实现高效流水线式并发处理
kotlin
@Bean
fun concurrentJob(): Job {
val queue = LinkedBlockingQueue<Data>(100)
return JobBuilder("concurrentJob", jobRepository)
.start(producerStep(queue))
.next(consumerStep(queue))
.build()
}
@Bean
fun producerStep(queue: BlockingQueue<Data>): Step {
return stepBuilderFactory.get("producerStep")
.chunk<Data, Data>(10)
.reader(fileReader())
.writer(BlockingQueueItemWriter(queue))
.build()
}
@Bean
fun consumerStep(queue: BlockingQueue<Data>): Step {
return stepBuilderFactory.get("consumerStep")
.chunk<Data, Data>(10)
.reader(BlockingQueueItemReader(queue))
.writer(databaseWriter())
.build()
}
6. JPA 查询提示支持
解决的问题:优化 JPA 查询性能
kotlin
@Bean
fun jpaReader(entityManagerFactory: EntityManagerFactory): JpaPagingItemReader<Order> {
return JpaPagingItemReaderBuilder<Order>()
.name("orderReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT o FROM Order o WHERE o.status = 'NEW'")
.parameterValues(mapOf("status" to "NEW"))
.queryHints(mapOf(
"javax.persistence.query.timeout" to 1000,
"org.hibernate.fetchSize" to 100
))
.build()
}
7. JDBC 数据类支持
解决的问题:完美支持 Java Record 和 Kotlin 数据类
kotlin
// Kotlin 数据类
data class Person(val id: Int, val name: String)
@Bean
fun recordReader(dataSource: DataSource): JdbcCursorItemReader<Person> {
return JdbcCursorItemReaderBuilder<Person>()
.name("personReader")
.dataSource(dataSource)
.sql("SELECT id, name FROM persons")
.dataRowMapper(Person::class.java)
.build()
}
8. 可配置行分隔符
解决的问题:灵活控制文件生成格式
kotlin
@Bean
fileWriter(): FlatFileItemWriter<Data> {
val aggregator = RecursiveCollectionLineAggregator<Data>().apply {
lineSeparator = "\r\n" // 明确指定Windows换行
}
return FlatFileItemWriterBuilder<Data>()
.name("fileWriter")
.resource(FileSystemResource("output.txt"))
.lineAggregator(aggregator)
.build()
}
9. 作业注册改进
解决的问题:消除启动时的烦人警告日志
kotlin
// 旧方式 (已废弃)
// @Bean
// fun jobRegistryPostProcessor(registry: JobRegistry): JobRegistryBeanPostProcessor {
// return JobRegistryBeanPostProcessor().apply {
// setJobRegistry(registry)
// }
// }
// 新方式 ✅
@Bean
fun jobRegistryInitializer(registry: JobRegistry, context: ApplicationContext): JobRegistrySmartInitializingSingleton {
return JobRegistrySmartInitializingSingleton(registry, context)
}
10. 依赖升级清单
Spring Batch 5.2 同步更新了关键依赖:
依赖项 | 新版本 | 重要性 |
---|---|---|
Spring Framework | 6.2.0 | ⭐⭐⭐⭐ |
Spring Data | 3.4.0 | ⭐⭐⭐⭐ |
Spring Kafka | 3.3.0 | ⭐⭐⭐ |
Micrometer | 1.14.1 | ⭐⭐⭐ |
Spring AMQP | 3.2.0 | ⭐⭐ |
升级建议
运行 ./mvnw dependency:tree
检查依赖冲突,特别注意:
gradle
// Gradle 推荐配置
dependencies {
implementation("org.springframework.boot:spring-boot-starter-batch")
implementation("org.springframework.batch:spring-batch-integration:5.2.0") // [!code highlight]
}
💡 最佳实践建议
组合新特性示例
点击查看完整配置示例
kotlin
@Configuration
@EnableBatchProcessing
class AdvancedBatchConfig {
@Autowired lateinit var jobBuilderFactory: JobBuilderFactory
@Autowired lateinit var stepBuilderFactory: StepBuilderFactory
// 1. 使用MongoDB存储元数据
@Bean
fun mongoJobRepo(mongo: MongoTemplate, tx: MongoTransactionManager) =
MongoJobRepositoryFactoryBean().apply {
setMongoOperations(mongo)
setTransactionManager(tx)
}.getObject()
// 2. 组合读取器 + 函数式处理
@Bean
fun customReader() = CompositeItemReader(listOf(dbReader(), apiReader()))
@Bean
fun filterProcessor() = PredicateFilteringItemProcessor<Data> { it.isValid }
// 3. 并发步骤配置
@Bean
fun concurrentJob(): Job {
val queue = LinkedBlockingQueue<Data>(500)
return jobBuilderFactory.get("concurrentJob")
.start(producerStep(queue))
.next(consumerStep(queue))
.build()
}
private fun producerStep(queue: BlockingQueue<Data>) =
stepBuilderFactory.get("producer")
.chunk<Data, Data>(100)
.reader(customReader())
.writer(BlockingQueueItemWriter(queue))
.build()
private fun consumerStep(queue: BlockingQueue<Data>) =
stepBuilderFactory.get("consumer")
.chunk<Data, Result>(50)
.reader(BlockingQueueItemReader(queue))
.processor(filterProcessor())
.writer(mongoWriter())
.build()
}
升级迁移指南
必须更改项:
kotlin// 5.1 旧方式 // @Bean // fun postProcessor(registry: JobRegistry) = JobRegistryBeanPostProcessor(registry) // 5.2 新方式 @Bean fun jobInitializer(registry: JobRegistry, ctx: ApplicationContext) = JobRegistrySmartInitializingSingleton(registry, ctx)
推荐改进项:
kotlin// 旧:普通类映射 .beanRowMapper(Person::class.java) // 新:数据类专用映射 .dataRowMapper(Person::class.java)
结语
Spring Batch 5.2 通过 MongoDB 支持、函数式编程增强和并发处理优化,显著提升了批处理开发的灵活性和效率。建议重点尝试:
- 在 NoSQL 环境中使用 MongoDB 存储作业元数据
- 用
CompositeItemReader
简化多数据源读取 - 通过阻塞队列实现高效流水线
- 将现有作业注册升级到
JobRegistrySmartInitializingSingleton
学习资源
"好的工具让复杂任务变简单 —— Spring Batch 5.2 正是这样的工具" 🛠️💪