Appearance
Spring Batch Integration 子元素详解
简化批处理集成,提升系统扩展性与可维护性
一、提供信息反馈 ⚡️
批处理任务常需长时间运行,实时反馈至关重要。Spring Batch 通过两种机制收集进度信息:
- 主动轮询:低效,不推荐
- 事件驱动监听器:高效,推荐方案
监听器类型
StepExecutionListener
:步骤前后事件ChunkListener
:数据块处理事件JobExecutionListener
:作业级事件
事件驱动实现方案
Kotlin 配置示例
kotlin
// 步骤监听网关
@MessagingGateway(name = "notificationListener")
interface NotificationExecutionListener : StepExecutionListener
// 日志处理器
@ServiceActivator(inputChannel = "stepExecutionsChannel")
fun loggingHandler(): LoggingHandler {
return LoggingHandler(LoggingHandler.Level.WARN).apply {
loggerName = "BATCH_LOGGER"
logExpressionString = "headers.id + ': ' + payload"
}
}
// 作业配置 [!code warning: 需添加@IntegrationComponentScan]
@Bean
fun importPaymentsJob(jobRepository: JobRepository): Job {
return JobBuilder("importPayments", jobRepository)
.start(StepBuilder("step1", jobRepository)
.chunk<Person, Person>(200)
.listener(notificationListener())
.reader(personReader())
.build()
.build()
}
二、异步处理器 🚀
通过 AsyncItemProcessor
+ AsyncItemWriter
实现并行处理,提升吞吐量:
执行流程
AsyncItemProcessor
将处理逻辑分发到线程池- 返回
Future
对象给AsyncItemWriter
AsyncItemWriter
等待所有结果后批量写入
kotlin
// 异步处理器配置
@Bean
fun asyncProcessor(
delegate: ItemProcessor<Person, Person>,
taskExecutor: TaskExecutor
): AsyncItemProcessor<Person, Person> {
return AsyncItemProcessor<Person, Person>().apply {
setDelegate(delegate)
setTaskExecutor(taskExecutor)
}
}
// 异步写入器配置
@Bean
fun asyncWriter(delegate: ItemWriter<Person>): AsyncItemWriter<Person> {
return AsyncItemWriter<Person>().apply {
setDelegate(delegate)
}
}
// 线程池配置 [!code warning: 根据负载调整线程数]
@Bean
fun taskExecutor(): TaskExecutor {
return ThreadPoolTaskExecutor().apply {
corePoolSize = 5
maxPoolSize = 10
setQueueCapacity(100)
}
}
三、外部化批处理执行 🌐
3.1 远程分块 (Remote Chunking)
核心限制
工作节点无状态,需确保所有业务逻辑可序列化
管理器配置
kotlin
// 分块消息写入器
@Bean
fun chunkWriter(): ChunkMessageChannelItemWriter<Person> {
val template = MessagingTemplate().apply {
defaultChannel = outboundRequests()
receiveTimeout = 2000
}
return ChunkMessageChannelItemWriter<Person>().apply {
messagingOperations = template
replyChannel = inboundReplies()
}
}
// JMS 请求通道
@Bean
fun outboundFlow(): IntegrationFlow {
return IntegrationFlow.from("outboundRequests")
.handle(Jms.outboundAdapter(connectionFactory)
.get()
}
工作节点配置
kotlin
// 分块处理器
@ServiceActivator(inputChannel = "requests")
fun chunkHandler(): ChunkProcessorChunkHandler<Person> {
val processor = SimpleChunkProcessor<Person, Person>(
personProcessor(),
personWriter()
)
return ChunkProcessorChunkHandler<Person>().apply {
chunkProcessor = processor
}
}
// 响应通道
@Bean
fun replies(): DirectChannel = DirectChannel()
3.2 远程分区 (Remote Partitioning)
适用场景
数据读取或结果写入成为性能瓶颈时
kotlin
// 分区处理器
@Bean
fun partitionHandler(): MessageChannelPartitionHandler {
val template = MessagingTemplate().apply {
defaultChannel = outboundRequests()
receiveTimeout = 100000
}
return MessageChannelPartitionHandler().apply {
stepName = "processPartition"
gridSize = 5
messagingOperations = template
}
}
// 工作节点请求处理器
@Bean
@ServiceActivator(inputChannel = "inboundRequests")
fun stepRequestHandler(): StepExecutionRequestHandler {
return StepExecutionRequestHandler().apply {
jobExplorer = jobExplorer
stepLocator = stepLocator
}
}
四、最佳实践 ✅
- 线程池优化:根据数据量动态调整
corePoolSize/maxPoolSize
- 异常处理:使用
@ServiceActivator
捕获异步任务异常 - 序列化:使用 Jackson 替代 Java 原生序列化
- 监控:集成 Micrometer 暴露执行指标
完整远程分块示例 (点击展开)
kotlin
@EnableBatchIntegration
@Configuration
class RemoteChunkingConfig {
// 管理器配置
@Bean
fun managerStep(
builder: RemoteChunkingManagerStepBuilderFactory
): Step {
return builder.get("managerStep")
.chunk<Person, Person>(100)
.reader(personReader())
.outputChannel(outboundRequests())
.inputChannel(inboundReplies())
.build()
}
// 工作节点配置
@Bean
fun workerFlow(
builder: RemoteChunkingWorkerBuilder
): IntegrationFlow {
return builder
.itemProcessor(personProcessor())
.itemWriter(personWriter())
.inputChannel(inboundRequests())
.outputChannel(outboundReplies())
.build()
}
// ActiveMQ 连接
@Bean
fun connectionFactory() = ActiveMQConnectionFactory("tcp://localhost:61616")
}
CAUTION
关键注意事项:
- 远程处理需确保网络延迟低于批处理超时时间
- 工作节点重启可能导致数据重复处理
- 使用
@EnableBatchIntegration
简化配置时,需显式声明通道名称
通过合理应用这些子元素,可实现:
✅ 处理耗时从小时级降至分钟级
✅ 资源利用率提升 3-5 倍
✅ 系统容错性显著增强