Appearance
🚀 Job-Launching Gateway 可用属性详解
Spring Batch Integration 的消息驱动任务启动网关,让你通过消息触发批处理任务。以下是控制网关行为的核心属性配置指南:
🔑 1. id 属性
作用:标识网关的 Spring Bean
实现类型:
EventDrivenConsumer
(输入通道为SubscribableChannel
时)PollingConsumer
(输入通道为PollableChannel
时)
通道类型选择
✅ 实时响应场景 → 用 SubscribableChannel
(如 DirectChannel
)
✅ 异步轮询场景 → 用 PollableChannel
(如 QueueChannel
)
⚙️ 2. auto-startup 属性
作用:控制网关是否随应用启动
默认值:true
(自动启动)
kotlin
@Bean
fun jobLaunchingGateway(): IntegrationFlow {
return IntegrationFlow.from("requestChannel")
.handle(JobLaunchingGateway(jobLauncher())) {
it.autoStartup(false) // 手动控制启动
}
}
📨 3. request-channel 属性
作用:定义接收启动消息的输入通道
kotlin
@Bean
fun jobFlow() = IntegrationFlow.from(MessageChannels.direct("jobRequest"))
.handle(JobLaunchingGateway(jobLauncher()))
.channel("replyChannel")
.get()
NOTE
消息格式要求:
- 消息体必须是
JobLaunchRequest
对象 - 包含
Job
实例和JobParameters
🔙 4. reply-channel 属性
作用:定义返回任务执行结果(JobExecution
)的通道
kotlin
@Bean
fun resultFlow() = IntegrationFlow.from("replyChannel")
.transform { jobExecution: JobExecution ->
"任务状态: ${jobExecution.status}"
}
.get()
⏱️ 5. reply-timeout 属性
作用:设置等待结果返回的超时时间(毫秒)
默认值:-1
(无限等待)
阻塞风险警告
使用有界队列时(如 QueueChannel
容量满),超时设置可防止线程死锁!
kotlin
.handle(JobLaunchingGateway(jobLauncher())) {
it.replyTimeout(5000) // 5秒超时
}
🚦 6. job-launcher 属性
作用:自定义任务启动器(覆盖默认 jobLauncher
Bean)
kotlin
@Bean
fun customLauncher(): JobLauncher {
val launcher = SimpleJobLauncher()
launcher.setTaskExecutor(SimpleAsyncTaskExecutor()) // 异步执行
return launcher
}
@Bean
fun gateway() = JobLaunchingGateway(customLauncher()) // 注入自定义启动器
WARNING
未显式配置且无默认 jobLauncher
Bean 时 → 抛出 BeanCreationException
🔢 7. order 属性
作用:当网关订阅 SubscribableChannel
时,指定消费顺序
kotlin
.handle(JobLaunchingGateway(jobLauncher())) {
it.order(3) // 优先级数值越小越先执行
}
🔄 消息驱动任务启动流程
通过 Mermaid 时序图理解完整工作流:
💡 最佳实践总结
- 通道选择:kotlin
// 实时处理 MessageChannels.direct("requestChannel") // 异步缓冲 MessageChannels.queue(100, "requestChannel")
- 超时设置:生产环境务必配置
reply-timeout
避免阻塞 - 错误处理:添加错误通道捕获异常kotlin
.handle(JobLaunchingGateway(jobLauncher())) { it.errorChannel("errorChannel") }
TIP
完整配置示例:
查看 Kotlin DSL 配置
kotlin
@Configuration
class BatchIntegrationConfig {
@Bean
fun jobFlow(job: Job) = IntegrationFlow.from("jobRequests")
.handle(JobLaunchingGateway(jobLauncher())) {
it.replyTimeout(3000)
.autoStartup(true)
}
.channel("jobResults")
.get()
@Bean
fun jobLauncher() = SimpleJobLauncher().apply {
setJobRepository(jobRepository)
setTaskExecutor(SimpleAsyncTaskExecutor())
}
}
通过合理配置这些属性,可实现灵活可靠的消息驱动批处理任务调度!⚡️