Skip to content

Spring Batch 并行处理实战教程

概述

在批处理领域,性能优化是核心挑战之一。Spring Batch 提供了多种并行处理方案,让我们能根据实际需求灵活选择。就像超市收银台:单线程是单人收银,多线程是多个收银台,而分区处理则是把商品分类到不同区域并行处理

先做简单测试!

在考虑复杂方案前,先用单线程处理你的数据:

kotlin
// 单线程简单示例
@Bean
fun simpleJob(jobRepository: JobRepository) = JobBuilder("simpleJob", jobRepository)
    .start(simpleStep(null, null))
    .build()

@Bean
fun simpleStep(jobRepository: JobRepository?, transactionManager: PlatformTransactionManager?) = 
    StepBuilder("simpleStep", jobRepository!!)
        .chunk<String, String>(100, transactionManager!!)
        .reader(fileReader())
        .writer(databaseWriter())
        .build()

如果处理几百MB文件只需几秒,可能不需要并行处理!

一、单进程并行方案

1. 多线程步骤 (Multi-threaded Step)

给步骤添加线程池,类似开启多个收银通道

kotlin
@Bean
fun taskExecutor() = ThreadPoolTaskExecutor().apply {
    corePoolSize = 5
    maxPoolSize = 10
    setThreadNamePrefix("batch-thread-")
    initialize()
}

@Bean
fun multiThreadStep(jobRepository: JobRepository, transactionManager: PlatformTransactionManager) = 
    StepBuilder("multiThreadStep", jobRepository)
        .chunk<String, String>(50, transactionManager)
        .reader(synchronizedReader()) // [!code warning: 必须线程安全!]
        .writer(threadSafeWriter())
        .taskExecutor(taskExecutor())
        .build()

// 线程安全装饰器
fun synchronizedReader() = SynchronizedItemStreamReader<Item>().apply {
    setDelegate(rawItemReader()) 
}

重要限制

  • ⚠️ 多数内置 Reader/Writer 非线程安全
  • ✅ 解决方案:
    1. 使用 SynchronizedItemStreamReader 包装
    2. 确保处理器无状态
    3. 数据库连接池需匹配线程数

WARNING

throttleLimit 在 Spring Batch 5.0+ 已废弃!替代方案:

kotlin
@Bean
fun customRepeatOperations() = TaskExecutorRepeatTemplate().apply {
    setTaskExecutor(taskExecutor())
    setThrottleLimit(20) // 自定义限流
}

@Bean
fun safeStep(jobRepository: JobRepository) = StepBuilder("safeStep", jobRepository)
    .chunk<Any, Any>(10, transactionManager)
    .reader(reader())
    .writer(writer())
    .stepOperations(customRepeatOperations()) 
    .build()

2. 并行步骤 (Parallel Steps)

让多个步骤同时运行,如同多条生产线并行

kotlin
// 定义并行流
@Bean
fun parallelFlow() = FlowBuilder<SimpleFlow>("parallelFlow")
    .split(taskExecutor())
    .add(flow1(), flow2())
    .build()

// 流1:步骤1->步骤2
@Bean
fun flow1() = FlowBuilder<SimpleFlow>("flow1")
    .start(step1())
    .next(step2())
    .build()

// 流2:步骤3
@Bean
fun flow2() = FlowBuilder<SimpleFlow>("flow2")
    .start(step3())
    .build()

// 主Job配置
@Bean
fun mainJob(jobRepository: JobRepository) = JobBuilder("mainJob", jobRepository)
    .start(parallelFlow())
    .next(step4())  // 聚合点
    .build()

二、分布式处理方案

1. 远程分块 (Remote Chunking)

管理器分配任务,工作者处理数据(类似MapReduce)

适用场景

  • 处理逻辑比读取数据更耗时
  • 需跨多台机器扩展
  • 使用可靠消息队列(如RabbitMQ/Kafka)

2. 分区处理 (Partitioning)

将数据拆分为独立分区并行处理

kotlin
@Bean
fun partitionStep(jobRepository: JobRepository) = StepBuilder("partitionStep", jobRepository)
    .partitioner("workerStep", columnPartitioner()) 
    .partitionHandler(partitionHandler())
    .build()

@Bean
fun partitionHandler() = TaskExecutorPartitionHandler().apply {
    taskExecutor = taskExecutor()
    step = workerStep()
    gridSize = 8 // 分区数量
}

@Bean
fun columnPartitioner(): Partitioner = ColumnRangePartitioner().apply {
    dataSource = dataSource
    table = "orders"
    column = "id"
}

核心组件解析

kotlin
// 数据分区策略
class DatePartitioner : Partitioner {
    override fun partition(gridSize: Int): Map<String, ExecutionContext> {
        return (0 until gridSize).associate { i ->
            "partition$i" to ExecutionContext().apply {
                put("startDate", LocalDate.now().minusDays(i.toLong()))
                put("endDate", LocalDate.now().minusDays(i - 1L))
            }
        }
    }
}
kotlin
@Bean
fun workerStep(jobRepository: JobRepository) = StepBuilder("workerStep", jobRepository)
    .chunk<Order, Report>(100, transactionManager)
    .reader(partitionAwareReader()) 
    .processor(orderProcessor)
    .writer(reportWriter)
    .build()

// 动态绑定分区参数
@Bean
@StepScope
fun partitionAwareReader(
    @Value("#{stepExecutionContext['startDate']}") start: LocalDate,
    @Value("#{stepExecutionContext['endDate']}") end: LocalDate
) = JdbcCursorItemReader<Order>().apply {
    sql = "SELECT * FROM orders WHERE date BETWEEN '$start' AND '$end'"
    // ... 
}

三、最佳实践对比

方案进程模型适用场景复杂度
多线程步骤单进程IO密集型任务,本地快速扩展⭐⭐
并行步骤单进程独立任务并行执行⭐⭐⭐
远程分块多进程CPU密集型,需跨机器扩展⭐⭐⭐⭐
分区处理单/多进程大数据集分治,最佳灵活性⭐⭐⭐⭐

选择策略

  1. 先尝试 单线程 满足需求吗?
  2. 本地资源够用?→ 选 多线程/并行步骤
  3. 需跨机器扩展?→ 远程分块/分区处理
  4. 数据可自然分割?→ 分区处理 是最佳选择

四、常见问题解决

CAUTION

分区数据倾斜问题

kotlin
// 错误示例:按ID取模导致不均匀
class BadPartitioner : Partitioner {
    override fun partition(gridSize: Int) = 
        (0 until gridSize).associate { i ->
            "p$i" to ExecutionContext(mapOf("mod" to i)) 
        }
}

// 正确方案:基于数据分布动态分区
class SmartPartitioner : Partitioner {
    override fun partition(gridSize: Int): Map<String, ExecutionContext> {
        val minMax = jdbcTemplate.queryForObject(
            "SELECT MIN(id), MAX(id) FROM orders", 
            RowMapper { rs, _ -> rs.getLong(1) to rs.getLong(2) }
        )
        val range = (minMax.second - minMax.first) / gridSize
        return (0 until gridSize).associate { i ->
            val start = minMax.first + range * i
            val end = if (i == gridSize - 1) minMax.second else start + range - 1
            "partition$i" to ExecutionContext(mapOf(
                "startId" to start,
                "endId" to end
            ))
        }
    }
}
完整分区处理配置(点击展开)
kotlin
@Configuration
@EnableBatchProcessing
class PartitionConfig {

    @Autowired lateinit var jobRepository: JobRepository
    @Autowired lateinit var transactionManager: PlatformTransactionManager

    // 1. 定义分区Job
    @Bean
    fun partitionedJob() = JobBuilder("partitionedJob", jobRepository)
        .start(masterStep())
        .build()

    // 2. 主控步骤
    @Bean
    fun masterStep() = StepBuilder("masterStep", jobRepository)
        .partitioner("workerStep", datePartitioner())
        .partitionHandler(partitionHandler())
        .build()

    // 3. 分区策略(按日期)
    @Bean
    fun datePartitioner(): Partitioner = object : Partitioner {
        override fun partition(gridSize: Int): Map<String, ExecutionContext> {
            val today = LocalDate.now()
            return (0 until gridSize).associate { i ->
                val date = today.minusDays(i.toLong())
                "partition$i" to ExecutionContext().apply {
                    put("processDate", date)
                }
            }
        }
    }

    // 4. 分区处理器
    @Bean
    fun partitionHandler() = TaskExecutorPartitionHandler().apply {
        taskExecutor = threadPoolTaskExecutor()
        step = workerStep()
        gridSize = 7 // 处理最近7天数据
    }

    // 5. 工作者步骤(动态绑定日期)
    @Bean
    fun workerStep() = StepBuilder("workerStep", jobRepository)
        .chunk<Order, Report>(100, transactionManager)
        .reader(partitionedReader(null))
        .processor(orderProcessor)
        .writer(reportWriter)
        .build()

    // 6. 动态Reader
    @Bean
    @StepScope
    fun partitionedReader(
        @Value("#{stepExecutionContext['processDate']}") date: LocalDate
    ) = JdbcCursorItemReader<Order>().apply {
        sql = "SELECT * FROM orders WHERE order_date = '$date'"
        rowMapper = OrderRowMapper()
    }

    // 7. 线程池配置
    @Bean
    fun threadPoolTaskExecutor() = ThreadPoolTaskExecutor().apply {
        corePoolSize = 4
        maxPoolSize = 8
        setQueueCapacity(100)
        initialize()
    }
}

总结

Spring Batch 提供了多层次的并行方案,从简单的线程池到分布式处理。关键选择原则:

  1. 评估需求:小数据集用单线程更简单
  2. ⚡️ 渐进式优化:多线程 → 并行步骤 → 分区处理
  3. 📊 监控指标:记录每个步骤耗时,针对性优化
  4. 🔒 注意线程安全:使用 SynchronizedItemStreamReader 或无状态组件

“并行不是万能的,没有并行的批处理却是万万不能的” —— Spring Batch 最佳实践