Appearance
Spring Batch 并行处理实战教程
概述
在批处理领域,性能优化是核心挑战之一。Spring Batch 提供了多种并行处理方案,让我们能根据实际需求灵活选择。就像超市收银台:单线程是单人收银,多线程是多个收银台,而分区处理则是把商品分类到不同区域并行处理!
先做简单测试!
在考虑复杂方案前,先用单线程处理你的数据:
kotlin
// 单线程简单示例
@Bean
fun simpleJob(jobRepository: JobRepository) = JobBuilder("simpleJob", jobRepository)
.start(simpleStep(null, null))
.build()
@Bean
fun simpleStep(jobRepository: JobRepository?, transactionManager: PlatformTransactionManager?) =
StepBuilder("simpleStep", jobRepository!!)
.chunk<String, String>(100, transactionManager!!)
.reader(fileReader())
.writer(databaseWriter())
.build()
如果处理几百MB文件只需几秒,可能不需要并行处理!
一、单进程并行方案
1. 多线程步骤 (Multi-threaded Step)
给步骤添加线程池,类似开启多个收银通道
kotlin
@Bean
fun taskExecutor() = ThreadPoolTaskExecutor().apply {
corePoolSize = 5
maxPoolSize = 10
setThreadNamePrefix("batch-thread-")
initialize()
}
@Bean
fun multiThreadStep(jobRepository: JobRepository, transactionManager: PlatformTransactionManager) =
StepBuilder("multiThreadStep", jobRepository)
.chunk<String, String>(50, transactionManager)
.reader(synchronizedReader()) // [!code warning: 必须线程安全!]
.writer(threadSafeWriter())
.taskExecutor(taskExecutor())
.build()
// 线程安全装饰器
fun synchronizedReader() = SynchronizedItemStreamReader<Item>().apply {
setDelegate(rawItemReader())
}
重要限制
- ⚠️ 多数内置 Reader/Writer 非线程安全
- ✅ 解决方案:
- 使用
SynchronizedItemStreamReader
包装 - 确保处理器无状态
- 数据库连接池需匹配线程数
- 使用
WARNING
throttleLimit 在 Spring Batch 5.0+ 已废弃!替代方案:
kotlin
@Bean
fun customRepeatOperations() = TaskExecutorRepeatTemplate().apply {
setTaskExecutor(taskExecutor())
setThrottleLimit(20) // 自定义限流
}
@Bean
fun safeStep(jobRepository: JobRepository) = StepBuilder("safeStep", jobRepository)
.chunk<Any, Any>(10, transactionManager)
.reader(reader())
.writer(writer())
.stepOperations(customRepeatOperations())
.build()
2. 并行步骤 (Parallel Steps)
让多个步骤同时运行,如同多条生产线并行
kotlin
// 定义并行流
@Bean
fun parallelFlow() = FlowBuilder<SimpleFlow>("parallelFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build()
// 流1:步骤1->步骤2
@Bean
fun flow1() = FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build()
// 流2:步骤3
@Bean
fun flow2() = FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build()
// 主Job配置
@Bean
fun mainJob(jobRepository: JobRepository) = JobBuilder("mainJob", jobRepository)
.start(parallelFlow())
.next(step4()) // 聚合点
.build()
二、分布式处理方案
1. 远程分块 (Remote Chunking)
管理器分配任务,工作者处理数据(类似MapReduce)
适用场景
- 处理逻辑比读取数据更耗时
- 需跨多台机器扩展
- 使用可靠消息队列(如RabbitMQ/Kafka)
2. 分区处理 (Partitioning)
将数据拆分为独立分区并行处理
kotlin
@Bean
fun partitionStep(jobRepository: JobRepository) = StepBuilder("partitionStep", jobRepository)
.partitioner("workerStep", columnPartitioner())
.partitionHandler(partitionHandler())
.build()
@Bean
fun partitionHandler() = TaskExecutorPartitionHandler().apply {
taskExecutor = taskExecutor()
step = workerStep()
gridSize = 8 // 分区数量
}
@Bean
fun columnPartitioner(): Partitioner = ColumnRangePartitioner().apply {
dataSource = dataSource
table = "orders"
column = "id"
}
核心组件解析
kotlin
// 数据分区策略
class DatePartitioner : Partitioner {
override fun partition(gridSize: Int): Map<String, ExecutionContext> {
return (0 until gridSize).associate { i ->
"partition$i" to ExecutionContext().apply {
put("startDate", LocalDate.now().minusDays(i.toLong()))
put("endDate", LocalDate.now().minusDays(i - 1L))
}
}
}
}
kotlin
@Bean
fun workerStep(jobRepository: JobRepository) = StepBuilder("workerStep", jobRepository)
.chunk<Order, Report>(100, transactionManager)
.reader(partitionAwareReader())
.processor(orderProcessor)
.writer(reportWriter)
.build()
// 动态绑定分区参数
@Bean
@StepScope
fun partitionAwareReader(
@Value("#{stepExecutionContext['startDate']}") start: LocalDate,
@Value("#{stepExecutionContext['endDate']}") end: LocalDate
) = JdbcCursorItemReader<Order>().apply {
sql = "SELECT * FROM orders WHERE date BETWEEN '$start' AND '$end'"
// ...
}
三、最佳实践对比
方案 | 进程模型 | 适用场景 | 复杂度 |
---|---|---|---|
多线程步骤 | 单进程 | IO密集型任务,本地快速扩展 | ⭐⭐ |
并行步骤 | 单进程 | 独立任务并行执行 | ⭐⭐⭐ |
远程分块 | 多进程 | CPU密集型,需跨机器扩展 | ⭐⭐⭐⭐ |
分区处理 | 单/多进程 | 大数据集分治,最佳灵活性 | ⭐⭐⭐⭐ |
选择策略
- 先尝试 单线程 满足需求吗?
- 本地资源够用?→ 选 多线程/并行步骤
- 需跨机器扩展?→ 远程分块/分区处理
- 数据可自然分割?→ 分区处理 是最佳选择
四、常见问题解决
CAUTION
分区数据倾斜问题
kotlin
// 错误示例:按ID取模导致不均匀
class BadPartitioner : Partitioner {
override fun partition(gridSize: Int) =
(0 until gridSize).associate { i ->
"p$i" to ExecutionContext(mapOf("mod" to i))
}
}
// 正确方案:基于数据分布动态分区
class SmartPartitioner : Partitioner {
override fun partition(gridSize: Int): Map<String, ExecutionContext> {
val minMax = jdbcTemplate.queryForObject(
"SELECT MIN(id), MAX(id) FROM orders",
RowMapper { rs, _ -> rs.getLong(1) to rs.getLong(2) }
)
val range = (minMax.second - minMax.first) / gridSize
return (0 until gridSize).associate { i ->
val start = minMax.first + range * i
val end = if (i == gridSize - 1) minMax.second else start + range - 1
"partition$i" to ExecutionContext(mapOf(
"startId" to start,
"endId" to end
))
}
}
}
完整分区处理配置(点击展开)
kotlin
@Configuration
@EnableBatchProcessing
class PartitionConfig {
@Autowired lateinit var jobRepository: JobRepository
@Autowired lateinit var transactionManager: PlatformTransactionManager
// 1. 定义分区Job
@Bean
fun partitionedJob() = JobBuilder("partitionedJob", jobRepository)
.start(masterStep())
.build()
// 2. 主控步骤
@Bean
fun masterStep() = StepBuilder("masterStep", jobRepository)
.partitioner("workerStep", datePartitioner())
.partitionHandler(partitionHandler())
.build()
// 3. 分区策略(按日期)
@Bean
fun datePartitioner(): Partitioner = object : Partitioner {
override fun partition(gridSize: Int): Map<String, ExecutionContext> {
val today = LocalDate.now()
return (0 until gridSize).associate { i ->
val date = today.minusDays(i.toLong())
"partition$i" to ExecutionContext().apply {
put("processDate", date)
}
}
}
}
// 4. 分区处理器
@Bean
fun partitionHandler() = TaskExecutorPartitionHandler().apply {
taskExecutor = threadPoolTaskExecutor()
step = workerStep()
gridSize = 7 // 处理最近7天数据
}
// 5. 工作者步骤(动态绑定日期)
@Bean
fun workerStep() = StepBuilder("workerStep", jobRepository)
.chunk<Order, Report>(100, transactionManager)
.reader(partitionedReader(null))
.processor(orderProcessor)
.writer(reportWriter)
.build()
// 6. 动态Reader
@Bean
@StepScope
fun partitionedReader(
@Value("#{stepExecutionContext['processDate']}") date: LocalDate
) = JdbcCursorItemReader<Order>().apply {
sql = "SELECT * FROM orders WHERE order_date = '$date'"
rowMapper = OrderRowMapper()
}
// 7. 线程池配置
@Bean
fun threadPoolTaskExecutor() = ThreadPoolTaskExecutor().apply {
corePoolSize = 4
maxPoolSize = 8
setQueueCapacity(100)
initialize()
}
}
总结
Spring Batch 提供了多层次的并行方案,从简单的线程池到分布式处理。关键选择原则:
- ✅ 评估需求:小数据集用单线程更简单
- ⚡️ 渐进式优化:多线程 → 并行步骤 → 分区处理
- 📊 监控指标:记录每个步骤耗时,针对性优化
- 🔒 注意线程安全:使用
SynchronizedItemStreamReader
或无状态组件
“并行不是万能的,没有并行的批处理却是万万不能的” —— Spring Batch 最佳实践