Skip to content

以下是根据要求编写的 Spring Batch Item Processing 教程,采用 Kotlin 实现,使用注解配置,并优化了内容结构和可视化展示:

🔄 Spring Batch 项目处理(Item Processing)精解

核心概念:ItemProcessor 是 Spring Batch 中用于在读取和写入之间执行业务逻辑处理的中间件,可进行数据转换、过滤、验证等操作

一、ItemProcessor 基础

1.1 核心作用

  • 数据转换:将读取的数据类型转换为写入所需类型(如 FooBar
  • 业务逻辑:执行验证、计算、数据增强等操作
  • 数据过滤:通过返回 null 排除不需要写入的记录

1.2 接口定义(Kotlin 实现)

kotlin
interface ItemProcessor<I, O> {
    fun process(item: I): O?
}

1.3 基础使用示例

kotlin
// 1. 数据模型
class Foo(val id: Long)
class Bar(val fooId: Long, val processed: Boolean = true)

// 2. 处理器实现
class FooProcessor : ItemProcessor<Foo, Bar> {
    override fun process(item: Foo): Bar? {
        //  // 重点:执行转换逻辑
        return Bar(item.id)
    }
}

// 3. 写入器
class BarWriter : ItemWriter<Bar> {
    override fun write(items: List<Bar>) {
        items.forEach { println("Writing: $it") }
    }
}

1.4 配置步骤(注解驱动)

kotlin
@Configuration
class BatchConfig {

    @Bean
    fun job(jobRepository: JobRepository, step: Step): Job {
        return JobBuilder("itemProcessJob", jobRepository)
            .start(step)
            .build()
    }

    @Bean
    fun step(
        jobRepository: JobRepository,
        transactionManager: PlatformTransactionManager
    ): Step {
        return StepBuilder("processStep", jobRepository)
            .chunk<Foo, Bar>(10, transactionManager) // [!code highlight] // 关键:定义处理块大小
            .reader(fooReader())
            .processor(fooProcessor())
            .writer(barWriter())
            .build()
    }

    @Bean
    fun fooReader(): ItemReader<Foo> {
        // 实现读取逻辑(示例)
        return ListItemReader(listOf(Foo(1), Foo(2)))
    }

    @Bean
    fun fooProcessor() = FooProcessor()

    @Bean
    fun barWriter() = BarWriter()
}

TIP

处理器是可选的:当不需要中间处理时,可直接连接 Reader 和 Writer

二、🔗 链式处理器(Chaining Processors)

2.1 应用场景

当需要多阶段处理时(如:数据清洗 → 数据转换 → 数据验证)

2.2 链式处理实现

kotlin
// 1. 定义处理阶段
class CleanProcessor : ItemProcessor<RawData, CleanData> {
    override fun process(item: RawData) = CleanData(item.id, item.value.trim())
}

class TransformProcessor : ItemProcessor<CleanData, TransformedData> {
    override fun process(item: CleanData) = TransformedData(item.id, item.value.uppercase())
}

class ValidateProcessor : ItemProcessor<TransformedData, ValidData> {
    override fun process(item: TransformedData) = if (item.isValid()) ValidData(item) else null
}

// 2. 组合处理器
@Bean
fun compositeProcessor(): CompositeItemProcessor<RawData, ValidData> {
    val processor = CompositeItemProcessor<RawData, ValidData>()
    processor.setDelegates(listOf(
        CleanProcessor(),
        TransformProcessor(),
        ValidateProcessor()
    ))
    return processor
}

// 3. 步骤配置
@Bean
fun processingStep(
    jobRepository: JobRepository,
    transactionManager: PlatformTransactionManager
): Step {
    return StepBuilder("chainedStep", jobRepository)
        .chunk<RawData, ValidData>(5, transactionManager)
        .reader(rawDataReader())
        .processor(compositeProcessor()) // [!code highlight] // 使用组合处理器
        .writer(validDataWriter())
        .build()
}

三、🚫 记录过滤(Filtering Records)

3.1 过滤机制

  • process() 方法中返回 null 实现过滤
  • 过滤 ≠ 跳过:过滤是主动排除,跳过是错误处理

[!EXAMPLE] 超市收银场景类比

  • 正常商品:放入购物车(处理)
  • 过期商品:直接丢弃(过滤)
  • 无法扫码商品:要求收银员处理(跳过)

3.2 过滤实现

kotlin
class OrderFilterProcessor : ItemProcessor<Order, Order> {
    override fun process(item: Order): Order? {
        //  // 重点过滤逻辑
        return when {
            item.status == Status.DELETED -> null  // 过滤已删除订单
            item.amount <= 0 -> null               // 过滤无效金额
            else -> item.apply { validate() }      // 正常处理
        }
    }
}

四、✅ 输入验证(Validating Input)

4.1 验证方式对比

验证方式实现类特点
自定义验证ValidatingItemProcessor灵活实现业务规则
JSR-303 注解验证BeanValidatingItemProcessor声明式验证

4.2 JSR-303 注解验证示例

kotlin
// 1. 使用验证注解
data class User(
    @field:NotBlank
    val name: String,

    @field:Email
    val email: String,

    @field:Min(18)
    val age: Int
)

// 2. 配置验证处理器
@Bean
fun validatingProcessor(): BeanValidatingItemProcessor<User> {
    return BeanValidatingItemProcessor<User>().apply {
        setFilter(true)  // [!code highlight] // 过滤无效记录
    }
}

// 3. 步骤配置
@Bean
fun validationStep(
    jobRepository: JobRepository,
    transactionManager: PlatformTransactionManager
): Step {
    return StepBuilder("validationStep", jobRepository)
        .chunk<User, User>(10, transactionManager)
        .reader(userReader())
        .processor(validatingProcessor())  // 注入验证处理器
        .writer(userWriter())
        .build()
}

WARNING

验证注意事项

  1. 确保添加 javax.validation:validation-api 依赖
  2. 实现 Validator 接口可自定义验证逻辑
  3. 验证失败默认抛出 ValidationException

五、⚡ 容错处理(Fault Tolerance)

5.1 幂等性设计原则

kotlin
class IdempotentProcessor : ItemProcessor<Input, Output> {
    //  // 幂等处理示例
    override fun process(item: Input): Output {
        // ✅ 正确:不修改原始对象
        return Output(item.data.copy(processed = true))

        // ❌ 错误:修改原始对象状态
        // item.processed = true // 会导致重试时状态错乱
    }
}

5.2 事务配置要点

kotlin
@Bean
fun resilientStep(
    jobRepository: JobRepository,
    transactionManager: PlatformTransactionManager
): Step {
    return StepBuilder("safeStep", jobRepository)
        .chunk<Data, Result>(10, transactionManager)
        .reader(dataReader())
        .processor(idempotentProcessor())
        .writer(resultWriter())
        .faultTolerant()
        .skipLimit(10)              // [!code ++] // 允许跳过10条错误
        .skip(ValidationException::class.java)
        .retryLimit(3)              // [!code ++] // 最大重试3次
        .retay(DeadlockLoserDataAccessException::class.java)
        .build()
}
完整容错配置示例(点击展开)
kotlin
@Bean
fun faultTolerantStep(
    jobRepository: JobRepository,
    transactionManager: PlatformTransactionManager
): Step {
    return StepBuilder("tolerantStep", jobRepository)
        .chunk<Data, Result>(5, transactionManager)
        .reader(dataReader())
        .processor(compositeProcessor())
        .writer(resultWriter())
        .faultTolerant()
        .skipPolicy { t, skipCount ->
            when {
                t is ValidationException -> true
                t is DataIntegrityViolationException && skipCount < 3 -> true
                else -> false
            }
        }
        .retryLimit(2)
        .retry(OptimisticLockingFailureException::class.java)
        .noRollback(DataFormatException::class.java)  // [!code highlight] // 指定不回滚的异常
        .build()
}

六、💎 最佳实践总结

场景实现方案注意事项
简单转换单一 ItemProcessor保持处理逻辑纯净
多阶段处理CompositeItemProcessor注意处理顺序
数据过滤返回 null区别于异常跳过
复杂验证ValidatingItemProcessor结合 Spring Validation
声明式验证BeanValidatingItemProcessor需 JSR-303 实现
容错处理配置 skip/retry确保处理器幂等性

⚠️ 关键原则:ItemProcessor 必须实现幂等性,因为失败重试时可能重复处理相同数据项