Skip to content

Spring Batch Integration 子元素详解

简化批处理集成,提升系统扩展性与可维护性

一、提供信息反馈 ⚡️

批处理任务常需长时间运行,实时反馈至关重要。Spring Batch 通过两种机制收集进度信息:

  1. 主动轮询:低效,不推荐
  2. 事件驱动监听器:高效,推荐方案

监听器类型

  • StepExecutionListener:步骤前后事件
  • ChunkListener:数据块处理事件
  • JobExecutionListener:作业级事件
事件驱动实现方案
Kotlin 配置示例
kotlin
// 步骤监听网关
@MessagingGateway(name = "notificationListener")
interface NotificationExecutionListener : StepExecutionListener

// 日志处理器
@ServiceActivator(inputChannel = "stepExecutionsChannel")
fun loggingHandler(): LoggingHandler {
    return LoggingHandler(LoggingHandler.Level.WARN).apply {
        loggerName = "BATCH_LOGGER"
        logExpressionString = "headers.id + ': ' + payload"
    }
}

// 作业配置 [!code warning: 需添加@IntegrationComponentScan]
@Bean
fun importPaymentsJob(jobRepository: JobRepository): Job {
    return JobBuilder("importPayments", jobRepository)
        .start(StepBuilder("step1", jobRepository)
            .chunk<Person, Person>(200)
            .listener(notificationListener()) 
            .reader(personReader())
            .build()
        .build()
}

二、异步处理器 🚀

通过 AsyncItemProcessor + AsyncItemWriter 实现并行处理,提升吞吐量:

执行流程

  1. AsyncItemProcessor 将处理逻辑分发到线程池
  2. 返回 Future 对象给 AsyncItemWriter
  3. AsyncItemWriter 等待所有结果后批量写入
kotlin
// 异步处理器配置
@Bean
fun asyncProcessor(
    delegate: ItemProcessor<Person, Person>,
    taskExecutor: TaskExecutor
): AsyncItemProcessor<Person, Person> {
    return AsyncItemProcessor<Person, Person>().apply {
        setDelegate(delegate)
        setTaskExecutor(taskExecutor)  
    }
}

// 异步写入器配置
@Bean
fun asyncWriter(delegate: ItemWriter<Person>): AsyncItemWriter<Person> {
    return AsyncItemWriter<Person>().apply {
        setDelegate(delegate)
    }
}

// 线程池配置 [!code warning: 根据负载调整线程数]
@Bean
fun taskExecutor(): TaskExecutor {
    return ThreadPoolTaskExecutor().apply {
        corePoolSize = 5
        maxPoolSize = 10
        setQueueCapacity(100)
    }
}

三、外部化批处理执行 🌐

3.1 远程分块 (Remote Chunking)

核心限制

工作节点无状态,需确保所有业务逻辑可序列化

管理器配置
kotlin
// 分块消息写入器
@Bean
fun chunkWriter(): ChunkMessageChannelItemWriter<Person> {
    val template = MessagingTemplate().apply {
        defaultChannel = outboundRequests()
        receiveTimeout = 2000
    }
    return ChunkMessageChannelItemWriter<Person>().apply {
        messagingOperations = template
        replyChannel = inboundReplies()
    }
}

// JMS 请求通道
@Bean
fun outboundFlow(): IntegrationFlow {
    return IntegrationFlow.from("outboundRequests")
        .handle(Jms.outboundAdapter(connectionFactory)
        .get()
}
工作节点配置
kotlin
// 分块处理器
@ServiceActivator(inputChannel = "requests")
fun chunkHandler(): ChunkProcessorChunkHandler<Person> {
    val processor = SimpleChunkProcessor<Person, Person>(
        personProcessor(),
        personWriter()
    )
    return ChunkProcessorChunkHandler<Person>().apply {
        chunkProcessor = processor
    }
}

// 响应通道
@Bean
fun replies(): DirectChannel = DirectChannel()
3.2 远程分区 (Remote Partitioning)

适用场景

数据读取结果写入成为性能瓶颈时

kotlin
// 分区处理器
@Bean
fun partitionHandler(): MessageChannelPartitionHandler {
    val template = MessagingTemplate().apply {
        defaultChannel = outboundRequests()
        receiveTimeout = 100000
    }
    return MessageChannelPartitionHandler().apply {
        stepName = "processPartition"
        gridSize = 5
        messagingOperations = template
    }
}

// 工作节点请求处理器
@Bean
@ServiceActivator(inputChannel = "inboundRequests")
fun stepRequestHandler(): StepExecutionRequestHandler {
    return StepExecutionRequestHandler().apply {
        jobExplorer = jobExplorer
        stepLocator = stepLocator
    }
}

四、最佳实践 ✅

  1. 线程池优化:根据数据量动态调整 corePoolSize/maxPoolSize
  2. 异常处理:使用 @ServiceActivator 捕获异步任务异常
  3. 序列化:使用 Jackson 替代 Java 原生序列化
  4. 监控:集成 Micrometer 暴露执行指标
完整远程分块示例 (点击展开)
kotlin
@EnableBatchIntegration
@Configuration
class RemoteChunkingConfig {

    // 管理器配置
    @Bean
    fun managerStep(
        builder: RemoteChunkingManagerStepBuilderFactory
    ): Step {
        return builder.get("managerStep")
            .chunk<Person, Person>(100)
            .reader(personReader())
            .outputChannel(outboundRequests())
            .inputChannel(inboundReplies())
            .build()
    }

    // 工作节点配置
    @Bean
    fun workerFlow(
        builder: RemoteChunkingWorkerBuilder
    ): IntegrationFlow {
        return builder
            .itemProcessor(personProcessor())
            .itemWriter(personWriter())
            .inputChannel(inboundRequests())
            .outputChannel(outboundReplies())
            .build()
    }

    // ActiveMQ 连接
    @Bean
    fun connectionFactory() = ActiveMQConnectionFactory("tcp://localhost:61616")
}

CAUTION

关键注意事项

  1. 远程处理需确保网络延迟低于批处理超时时间
  2. 工作节点重启可能导致数据重复处理
  3. 使用 @EnableBatchIntegration 简化配置时,需显式声明通道名称

通过合理应用这些子元素,可实现:
✅ 处理耗时从小时级降至分钟级
✅ 资源利用率提升 3-5 倍
✅ 系统容错性显著增强