Appearance
📚 Spring Batch 批处理与事务深度解析
核心概念:批处理的核心是高效处理大量数据,事务管理确保数据一致性。二者结合时需解决事务边界、重试机制、资源正交性等关键问题。
⚙️ 无重试的简单批处理模式
kotlin
@Transactional
fun processChunk() {
repeat(5) { // 处理5个数据块
val input = readInput()
processOutput(input) // 必须幂等或事务性
}
}
while (hasMoreData()) { // 持续处理直到数据耗尽
processChunk() // 每次提交一个完整数据块
}
关键特性
- 事务边界:每个数据块(chunk)在独立事务中处理
- 错误处理:块内任意步骤失败将导致整个事务回滚
- 必备条件:输入/输出操作需具备事务性或幂等性
🔁 无状态重试模式
kotlin
@Transactional
fun processItem() {
val input = readInput()
val output = process(input)
retry(maxAttempts = 3) {
callRemoteService(output) // 远程调用(非事务操作)
}
}
IMPORTANT
适用场景:
远程服务调用等非事务性操作,失败后可通过重试恢复。事务成功与否仅取决于最终重试结果
🔄 典型重复-重试模式(企业级方案)
kotlin
@Bean
fun chunkBasedJob(): Job {
return jobBuilderFactory.get("retryJob")
.start(stepBuilderFactory.get("retryStep")
.chunk<Input, Output>(5)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.faultTolerant()
.retry(DeadlockLoserException::class.java)
.retryLimit(3)
.skipLimit(10)
.skip(OptimisticLockingFailureException::class.java)
.build())
.build()
}
注意
状态性重试必须记录处理历史,需启用Spring Batch的RetryContext
:
kotlin
@Bean
fun retryTemplate(): RetryTemplate {
return RetryTemplate.builder()
.maxAttempts(3)
.retryOn(DeadlockLoserException::class.java)
.withStatefulRetry()
.build()
}
⚡️ 异步处理优化
异步块处理
kotlin
@Bean
fun asyncChunkStep(): Step {
return stepBuilderFactory.get("asyncStep")
.chunk<Input, Output>(100)
.reader(reader())
.writer(writer())
.taskExecutor(SimpleAsyncTaskExecutor())
.throttleLimit(10) // 并发线程数
.build()
}
异步项处理
kotlin
@Bean
fun asyncItemStep(): Step {
return stepBuilderFactory.get("itemStep")
.chunk<Input, Output>(1) // 每个事务处理单个项
.reader(reader())
.processor(asyncItemProcessor()) // 返回ListenableFuture
.writer(writer())
.build()
}
CAUTION
性能权衡:
项级异步牺牲事务优化优势,仅当处理耗时 >> 事务管理耗时时适用
🔗 事务传播机制深度解析
嵌套事务问题
kotlin
@Transactional
fun outerMethod() {
repeat(5) {
retryTemplate.execute { _ ->
@Transactional(propagation = Propagation.NESTED)
fun innerMethod() {
sensitiveDatabaseOperation()
}
}
}
}
传播行为 | 重试时是否安全 | 支持数据库 |
---|---|---|
REQUIRED | ❌ 污染外层事务 | 所有 |
REQUIRES_NEW | ⚠️ 可能违反原子性 | 所有 |
NESTED | ✅ 完美支持回滚 | 仅部分(如Oracle) |
关键限制
无状态重试无法与不支持NESTED
传播的事务管理器(如JTA)协同工作
⚖️ 正交资源事务处理
kotlin
jmsTemplate.execute { session -> // JMS事务
val message = receiveMessage(session)
retryTemplate.execute {
jdbcTemplate.update("INSERT...") // DB事务
}
}
TIP
此模式实现最大努力单阶段提交:
- JMS和DB事务相互独立
- 最坏情况可能产生重复消息
- 优于XA两阶段提交的性能
🚫 无状态重试的恢复限制
kotlin
retryTemplate.execute(
RetryCallback { _ ->
@Transactional(propagation = Propagation.NESTED) // 必须有嵌套事务
fun tryProcess() {
val item = itemReader.read()
itemWriter.write(item)
}
},
RecoveryCallback { _ ->
skipItem() // 仅当启用NESTED传播时有效
}
)
设计建议
实际解决方案:
优先采用状态性重试 + 跳过策略的组合方案,兼容所有事务管理器
kotlin
.skipPolicy { throwable, _ ->
throwable is DataIntegrityViolationException
}
✅ 最佳实践总结
- 事务划分:以数据块(chunk)为事务边界
- 重试策略:
- 远程调用 → 无状态重试
- 数据库操作 → 状态性重试 + 跳过策略
- 并发优化:
- 传播控制:优先使用
NESTED
传播(支持时) - 错误恢复:结合
SkipPolicy
实现弹性处理
最终的批处理系统应像精密的钟表:每个事务是齿轮,重试机制是发条,异步处理是加速器⚙️⏱️⚡️