Skip to content

📚 Spring Batch 批处理与事务深度解析

核心概念:批处理的核心是高效处理大量数据,事务管理确保数据一致性。二者结合时需解决事务边界、重试机制、资源正交性等关键问题。

⚙️ 无重试的简单批处理模式

kotlin
@Transactional
fun processChunk() {
    repeat(5) {  // 处理5个数据块
        val input = readInput()
        processOutput(input)     // 必须幂等或事务性
    }
}

while (hasMoreData()) {  // 持续处理直到数据耗尽
    processChunk()       // 每次提交一个完整数据块
}

关键特性

  • 事务边界:每个数据块(chunk)在独立事务中处理
  • 错误处理:块内任意步骤失败将导致整个事务回滚
  • 必备条件:输入/输出操作需具备事务性幂等性

无重试批处理流程图

🔁 无状态重试模式

kotlin
@Transactional
fun processItem() {
    val input = readInput()
    val output = process(input)

    retry(maxAttempts = 3) {
        callRemoteService(output)  // 远程调用(非事务操作)
    }
}

IMPORTANT

适用场景
远程服务调用等非事务性操作,失败后可通过重试恢复。事务成功与否仅取决于最终重试结果

🔄 典型重复-重试模式(企业级方案)

kotlin
@Bean
fun chunkBasedJob(): Job {
    return jobBuilderFactory.get("retryJob")
        .start(stepBuilderFactory.get("retryStep")
            .chunk<Input, Output>(5)
            .reader(itemReader())
            .processor(itemProcessor())
            .writer(itemWriter())
            .faultTolerant()
            .retry(DeadlockLoserException::class.java)
            .retryLimit(3)
            .skipLimit(10)
            .skip(OptimisticLockingFailureException::class.java)
            .build())
        .build()
}

注意

状态性重试必须记录处理历史,需启用Spring Batch的RetryContext

kotlin
@Bean
fun retryTemplate(): RetryTemplate {
    return RetryTemplate.builder()
        .maxAttempts(3)
        .retryOn(DeadlockLoserException::class.java)
        .withStatefulRetry()
        .build()
}

⚡️ 异步处理优化

异步块处理

kotlin
@Bean
fun asyncChunkStep(): Step {
    return stepBuilderFactory.get("asyncStep")
        .chunk<Input, Output>(100)
        .reader(reader())
        .writer(writer())
        .taskExecutor(SimpleAsyncTaskExecutor())
        .throttleLimit(10)  // 并发线程数
        .build()
}

异步项处理

kotlin
@Bean
fun asyncItemStep(): Step {
    return stepBuilderFactory.get("itemStep")
        .chunk<Input, Output>(1)  // 每个事务处理单个项
        .reader(reader())
        .processor(asyncItemProcessor())  // 返回ListenableFuture
        .writer(writer())
        .build()
}

CAUTION

性能权衡
项级异步牺牲事务优化优势,仅当处理耗时 >> 事务管理耗时时适用

🔗 事务传播机制深度解析

嵌套事务问题

kotlin
@Transactional
fun outerMethod() {
    repeat(5) {
        retryTemplate.execute { _ ->
            @Transactional(propagation = Propagation.NESTED)
            fun innerMethod() {
                sensitiveDatabaseOperation()
            }
        }
    }
}
传播行为重试时是否安全支持数据库
REQUIRED❌ 污染外层事务所有
REQUIRES_NEW⚠️ 可能违反原子性所有
NESTED✅ 完美支持回滚仅部分(如Oracle)

关键限制

无状态重试无法与不支持NESTED传播的事务管理器(如JTA)协同工作

⚖️ 正交资源事务处理

kotlin
jmsTemplate.execute { session ->  // JMS事务
    val message = receiveMessage(session)
    retryTemplate.execute {
        jdbcTemplate.update("INSERT...")  // DB事务
    }
}

TIP

此模式实现最大努力单阶段提交

  • JMS和DB事务相互独立
  • 最坏情况可能产生重复消息
  • 优于XA两阶段提交的性能

🚫 无状态重试的恢复限制

kotlin
retryTemplate.execute(
    RetryCallback { _ ->
        @Transactional(propagation = Propagation.NESTED)  // 必须有嵌套事务
        fun tryProcess() {
            val item = itemReader.read()
            itemWriter.write(item)
        }
    },
    RecoveryCallback { _ ->
        skipItem()  // 仅当启用NESTED传播时有效
    }
)

设计建议

实际解决方案
优先采用状态性重试 + 跳过策略的组合方案,兼容所有事务管理器

kotlin
.skipPolicy { throwable, _ ->
    throwable is DataIntegrityViolationException
}

✅ 最佳实践总结

  1. 事务划分:以数据块(chunk)为事务边界
  2. 重试策略
    • 远程调用 → 无状态重试
    • 数据库操作 → 状态性重试 + 跳过策略
  3. 并发优化
  4. 传播控制:优先使用NESTED传播(支持时)
  5. 错误恢复:结合SkipPolicy实现弹性处理

最终的批处理系统应像精密的钟表:每个事务是齿轮,重试机制是发条,异步处理是加速器⚙️⏱️⚡️