Appearance
以下是根据要求编写的 Spring Batch Item Processing 教程,采用 Kotlin 实现,使用注解配置,并优化了内容结构和可视化展示:
🔄 Spring Batch 项目处理(Item Processing)精解
核心概念:ItemProcessor 是 Spring Batch 中用于在读取和写入之间执行业务逻辑处理的中间件,可进行数据转换、过滤、验证等操作
一、ItemProcessor 基础
1.1 核心作用
- 数据转换:将读取的数据类型转换为写入所需类型(如
Foo
→Bar
) - 业务逻辑:执行验证、计算、数据增强等操作
- 数据过滤:通过返回
null
排除不需要写入的记录
1.2 接口定义(Kotlin 实现)
kotlin
interface ItemProcessor<I, O> {
fun process(item: I): O?
}
1.3 基础使用示例
kotlin
// 1. 数据模型
class Foo(val id: Long)
class Bar(val fooId: Long, val processed: Boolean = true)
// 2. 处理器实现
class FooProcessor : ItemProcessor<Foo, Bar> {
override fun process(item: Foo): Bar? {
// // 重点:执行转换逻辑
return Bar(item.id)
}
}
// 3. 写入器
class BarWriter : ItemWriter<Bar> {
override fun write(items: List<Bar>) {
items.forEach { println("Writing: $it") }
}
}
1.4 配置步骤(注解驱动)
kotlin
@Configuration
class BatchConfig {
@Bean
fun job(jobRepository: JobRepository, step: Step): Job {
return JobBuilder("itemProcessJob", jobRepository)
.start(step)
.build()
}
@Bean
fun step(
jobRepository: JobRepository,
transactionManager: PlatformTransactionManager
): Step {
return StepBuilder("processStep", jobRepository)
.chunk<Foo, Bar>(10, transactionManager) // [!code highlight] // 关键:定义处理块大小
.reader(fooReader())
.processor(fooProcessor())
.writer(barWriter())
.build()
}
@Bean
fun fooReader(): ItemReader<Foo> {
// 实现读取逻辑(示例)
return ListItemReader(listOf(Foo(1), Foo(2)))
}
@Bean
fun fooProcessor() = FooProcessor()
@Bean
fun barWriter() = BarWriter()
}
TIP
处理器是可选的:当不需要中间处理时,可直接连接 Reader 和 Writer
二、🔗 链式处理器(Chaining Processors)
2.1 应用场景
当需要多阶段处理时(如:数据清洗 → 数据转换 → 数据验证)
2.2 链式处理实现
kotlin
// 1. 定义处理阶段
class CleanProcessor : ItemProcessor<RawData, CleanData> {
override fun process(item: RawData) = CleanData(item.id, item.value.trim())
}
class TransformProcessor : ItemProcessor<CleanData, TransformedData> {
override fun process(item: CleanData) = TransformedData(item.id, item.value.uppercase())
}
class ValidateProcessor : ItemProcessor<TransformedData, ValidData> {
override fun process(item: TransformedData) = if (item.isValid()) ValidData(item) else null
}
// 2. 组合处理器
@Bean
fun compositeProcessor(): CompositeItemProcessor<RawData, ValidData> {
val processor = CompositeItemProcessor<RawData, ValidData>()
processor.setDelegates(listOf(
CleanProcessor(),
TransformProcessor(),
ValidateProcessor()
))
return processor
}
// 3. 步骤配置
@Bean
fun processingStep(
jobRepository: JobRepository,
transactionManager: PlatformTransactionManager
): Step {
return StepBuilder("chainedStep", jobRepository)
.chunk<RawData, ValidData>(5, transactionManager)
.reader(rawDataReader())
.processor(compositeProcessor()) // [!code highlight] // 使用组合处理器
.writer(validDataWriter())
.build()
}
三、🚫 记录过滤(Filtering Records)
3.1 过滤机制
- 在
process()
方法中返回null
实现过滤 - 过滤 ≠ 跳过:过滤是主动排除,跳过是错误处理
[!EXAMPLE] 超市收银场景类比:
- 正常商品:放入购物车(处理)
- 过期商品:直接丢弃(过滤)
- 无法扫码商品:要求收银员处理(跳过)
3.2 过滤实现
kotlin
class OrderFilterProcessor : ItemProcessor<Order, Order> {
override fun process(item: Order): Order? {
// // 重点过滤逻辑
return when {
item.status == Status.DELETED -> null // 过滤已删除订单
item.amount <= 0 -> null // 过滤无效金额
else -> item.apply { validate() } // 正常处理
}
}
}
四、✅ 输入验证(Validating Input)
4.1 验证方式对比
验证方式 | 实现类 | 特点 |
---|---|---|
自定义验证 | ValidatingItemProcessor | 灵活实现业务规则 |
JSR-303 注解验证 | BeanValidatingItemProcessor | 声明式验证 |
4.2 JSR-303 注解验证示例
kotlin
// 1. 使用验证注解
data class User(
@field:NotBlank
val name: String,
@field:Email
val email: String,
@field:Min(18)
val age: Int
)
// 2. 配置验证处理器
@Bean
fun validatingProcessor(): BeanValidatingItemProcessor<User> {
return BeanValidatingItemProcessor<User>().apply {
setFilter(true) // [!code highlight] // 过滤无效记录
}
}
// 3. 步骤配置
@Bean
fun validationStep(
jobRepository: JobRepository,
transactionManager: PlatformTransactionManager
): Step {
return StepBuilder("validationStep", jobRepository)
.chunk<User, User>(10, transactionManager)
.reader(userReader())
.processor(validatingProcessor()) // 注入验证处理器
.writer(userWriter())
.build()
}
WARNING
验证注意事项:
- 确保添加
javax.validation:validation-api
依赖 - 实现
Validator
接口可自定义验证逻辑 - 验证失败默认抛出
ValidationException
五、⚡ 容错处理(Fault Tolerance)
5.1 幂等性设计原则
kotlin
class IdempotentProcessor : ItemProcessor<Input, Output> {
// // 幂等处理示例
override fun process(item: Input): Output {
// ✅ 正确:不修改原始对象
return Output(item.data.copy(processed = true))
// ❌ 错误:修改原始对象状态
// item.processed = true // 会导致重试时状态错乱
}
}
5.2 事务配置要点
kotlin
@Bean
fun resilientStep(
jobRepository: JobRepository,
transactionManager: PlatformTransactionManager
): Step {
return StepBuilder("safeStep", jobRepository)
.chunk<Data, Result>(10, transactionManager)
.reader(dataReader())
.processor(idempotentProcessor())
.writer(resultWriter())
.faultTolerant()
.skipLimit(10) // [!code ++] // 允许跳过10条错误
.skip(ValidationException::class.java)
.retryLimit(3) // [!code ++] // 最大重试3次
.retay(DeadlockLoserDataAccessException::class.java)
.build()
}
完整容错配置示例(点击展开)
kotlin
@Bean
fun faultTolerantStep(
jobRepository: JobRepository,
transactionManager: PlatformTransactionManager
): Step {
return StepBuilder("tolerantStep", jobRepository)
.chunk<Data, Result>(5, transactionManager)
.reader(dataReader())
.processor(compositeProcessor())
.writer(resultWriter())
.faultTolerant()
.skipPolicy { t, skipCount ->
when {
t is ValidationException -> true
t is DataIntegrityViolationException && skipCount < 3 -> true
else -> false
}
}
.retryLimit(2)
.retry(OptimisticLockingFailureException::class.java)
.noRollback(DataFormatException::class.java) // [!code highlight] // 指定不回滚的异常
.build()
}
六、💎 最佳实践总结
场景 | 实现方案 | 注意事项 |
---|---|---|
简单转换 | 单一 ItemProcessor | 保持处理逻辑纯净 |
多阶段处理 | CompositeItemProcessor | 注意处理顺序 |
数据过滤 | 返回 null | 区别于异常跳过 |
复杂验证 | ValidatingItemProcessor | 结合 Spring Validation |
声明式验证 | BeanValidatingItemProcessor | 需 JSR-303 实现 |
容错处理 | 配置 skip/retry | 确保处理器幂等性 |
⚠️ 关键原则:ItemProcessor 必须实现幂等性,因为失败重试时可能重复处理相同数据项