Skip to content

🚀 Job-Launching Gateway 可用属性详解

Spring Batch Integration 的消息驱动任务启动网关,让你通过消息触发批处理任务。以下是控制网关行为的核心属性配置指南:


🔑 1. id 属性

作用:标识网关的 Spring Bean
实现类型

  • EventDrivenConsumer(输入通道为 SubscribableChannel 时)
  • PollingConsumer(输入通道为 PollableChannel 时)

通道类型选择

✅ 实时响应场景 → 用 SubscribableChannel (如 DirectChannel)
✅ 异步轮询场景 → 用 PollableChannel (如 QueueChannel)


⚙️ 2. auto-startup 属性

作用:控制网关是否随应用启动
默认值true(自动启动)

kotlin
@Bean
fun jobLaunchingGateway(): IntegrationFlow {
    return IntegrationFlow.from("requestChannel")
        .handle(JobLaunchingGateway(jobLauncher())) {
            it.autoStartup(false) // 手动控制启动
        }
}

📨 3. request-channel 属性

作用:定义接收启动消息的输入通道

kotlin
@Bean
fun jobFlow() = IntegrationFlow.from(MessageChannels.direct("jobRequest")) 
    .handle(JobLaunchingGateway(jobLauncher()))
    .channel("replyChannel")
    .get()

NOTE

消息格式要求:

  • 消息体必须是 JobLaunchRequest 对象
  • 包含 Job 实例和 JobParameters

🔙 4. reply-channel 属性

作用:定义返回任务执行结果(JobExecution)的通道

kotlin
@Bean
fun resultFlow() = IntegrationFlow.from("replyChannel") 
    .transform { jobExecution: JobExecution ->
        "任务状态: ${jobExecution.status}"
    }
    .get()

⏱️ 5. reply-timeout 属性

作用:设置等待结果返回的超时时间(毫秒)
默认值-1(无限等待)

阻塞风险警告

使用有界队列时(如 QueueChannel 容量满),超时设置可防止线程死锁!

kotlin
.handle(JobLaunchingGateway(jobLauncher())) {
    it.replyTimeout(5000) // 5秒超时
}

🚦 6. job-launcher 属性

作用:自定义任务启动器(覆盖默认 jobLauncher Bean)

kotlin
@Bean
fun customLauncher(): JobLauncher {
    val launcher = SimpleJobLauncher()
    launcher.setTaskExecutor(SimpleAsyncTaskExecutor()) // 异步执行
    return launcher
}

@Bean
fun gateway() = JobLaunchingGateway(customLauncher()) // 注入自定义启动器

WARNING

未显式配置且无默认 jobLauncher Bean 时 → 抛出 BeanCreationException


🔢 7. order 属性

作用:当网关订阅 SubscribableChannel 时,指定消费顺序

kotlin
.handle(JobLaunchingGateway(jobLauncher())) {
    it.order(3) // 优先级数值越小越先执行
}

🔄 消息驱动任务启动流程

通过 Mermaid 时序图理解完整工作流:


💡 最佳实践总结

  1. 通道选择
    kotlin
    // 实时处理
    MessageChannels.direct("requestChannel") 
    
    // 异步缓冲
    MessageChannels.queue(100, "requestChannel")
  2. 超时设置:生产环境务必配置 reply-timeout 避免阻塞
  3. 错误处理:添加错误通道捕获异常
    kotlin
    .handle(JobLaunchingGateway(jobLauncher())) {
        it.errorChannel("errorChannel") 
    }

TIP

完整配置示例:

查看 Kotlin DSL 配置
kotlin
@Configuration
class BatchIntegrationConfig {

    @Bean
    fun jobFlow(job: Job) = IntegrationFlow.from("jobRequests")
        .handle(JobLaunchingGateway(jobLauncher())) {
            it.replyTimeout(3000)
               .autoStartup(true)
        }
        .channel("jobResults")
        .get()

    @Bean
    fun jobLauncher() = SimpleJobLauncher().apply {
        setJobRepository(jobRepository)
        setTaskExecutor(SimpleAsyncTaskExecutor())
    }
}

通过合理配置这些属性,可实现灵活可靠的消息驱动批处理任务调度!⚡️