Skip to content

🌟 Spring Batch 步骤流程控制详解

本教程将深入讲解 Spring Batch 中步骤(Step)流程控制的机制与实现方式,通过通俗易懂的示例帮助您掌握作业流程编排的核心技巧。

一、流程控制基础

为什么需要控制步骤流程?

在 Spring Batch 中,作业(Job)由多个步骤(Step)组成,但这些步骤的执行并非总是线性的

  • 某个步骤失败不一定导致整个作业失败
  • 步骤的成功可能有多种状态,需要不同处理
  • 某些步骤可能根据条件被跳过不执行

TIP

理解步骤流程控制是构建复杂批处理作业的关键,它让您的作业具备智能决策能力,而不仅仅是简单的顺序执行。

核心概念说明

二、顺序流控制

基本顺序执行

这是最简单的流程 - 所有步骤按顺序依次执行:

kotlin
@Bean
fun job(jobRepository: JobRepository, stepA: Step, stepB: Step, stepC: Step): Job {
    return JobBuilder("sequentialJob", jobRepository)
        .start(stepA)
        .next(stepB)
        .next(stepC)
        .build()
}

NOTE

在顺序流中:

  • 如果 stepA 失败,整个作业立即失败,stepBstepC 不会执行
  • 所有步骤必须成功作业才会成功

三、条件流控制

基本条件分支

根据步骤执行结果决定下一步路径:

kotlin
@Bean
fun conditionalJob(jobRepository: JobRepository, stepA: Step, stepB: Step, stepC: Step): Job {
    return JobBuilder("conditionalJob", jobRepository)
        .start(stepA)
        .on("*").to(stepB)
        .from(stepA)
        .on("FAILED").to(stepC)
        .end()
        .build()
}
代码说明
  • on("*"):匹配任意退出状态(除了明确指定的状态)
  • on("FAILED"):当退出状态为 FAILED 时触发
  • to(stepC):转移到指定步骤

状态码匹配规则

Spring Batch 使用简单的模式匹配:

模式说明示例匹配
*匹配零个或多个字符FAILED, COMPLETED
?匹配单个字符A, B (不匹配 AB)
C*T匹配C开头T结尾的状态COUNT, CAT
C?T匹配三个字符C开头T结尾CAT (不匹配 COUNT)

WARNING

每个步骤的所有可能退出状态必须被明确定义,否则框架会抛出异常导致作业失败!

批处理状态 vs 退出状态

状态类型说明
BatchStatus框架内部状态(COMPLETED, STARTED, FAILED 等)
ExitStatus步骤执行后的自定义退出状态,决定流程走向的关键
kotlin
class SkipCheckingListener : StepExecutionListener {
    override fun afterStep(stepExecution: StepExecution): ExitStatus? {
        return if (stepExecution.skipCount > 0) {
            ExitStatus("COMPLETED WITH SKIPS")
        } else {
            null
        }
    }
}

在作业配置中使用自定义状态:

kotlin
@Bean
fun customStatusJob(jobRepository: JobRepository, step1: Step, errorPrint: Step): Job {
    return JobBuilder("customStatusJob", jobRepository)
        .start(step1)
        .on("FAILED").end()
        .from(step1)
        .on("COMPLETED WITH SKIPS").to(errorPrint)
        .from(step1)
        .on("*").to(step2)
        .end()
        .build()
}

四、配置停止行为

1. 正常结束作业

使用 end() 方法正常终止作业:

kotlin
@Bean
fun endJob(jobRepository: JobRepository, step1: Step, step2: Step): Job {
    return JobBuilder("endJob", jobRepository)
        .start(step1)
        .next(step2)
        .on("FAILED").end()
        .from(step2).on("*").to(step3)
        .end()
        .build()
}

IMPORTANT

使用 end() 终止的作业状态为 COMPLETED,无法重新启动!

2. 使作业失败

使用 fail() 方法使作业失败:

kotlin
@Bean
fun failJob(jobRepository: JobRepository, step1: Step, step2: Step): Job {
    return JobBuilder("failJob", jobRepository)
        .start(step1)
        .next(step2).on("FAILED").fail()
        .from(step2).on("*").to(step3)
        .end()
        .build()
}

TIP

失败状态(FAILED)的作业可以重新启动,适合需要重试的场景。

3. 暂停作业

使用 stopAndRestart() 暂停作业:

kotlin
@Bean
fun stopJob(jobRepository: JobRepository, step1: Step, step2: Step): Job {
    return JobBuilder("stopJob", jobRepository)
        .start(step1)
        .on("COMPLETED").stopAndRestart(step2)
        .end()
        .build()
}

暂停作业特点

  • 状态为 STOPPED
  • 可恢复执行(从指定步骤继续)
  • 适合需要人工干预的场景

五、编程式流程决策

自定义决策器

当简单状态匹配无法满足需求时,可实现 JobExecutionDecider

kotlin
class CustomDecider : JobExecutionDecider {
    override fun decide(
        jobExecution: JobExecution,
        stepExecution: StepExecution?
    ): FlowExecutionStatus {
        return if (someComplexCondition()) {
            FlowExecutionStatus("SPECIAL_CASE")
        } else {
            FlowExecutionStatus("NORMAL")
        }
    }
}

在作业配置中使用决策器:

kotlin
@Bean
fun deciderJob(
    jobRepository: JobRepository,
    decider: JobExecutionDecider,
    stepA: Step,
    stepB: Step
): Job {
    return JobBuilder("deciderJob", jobRepository)
        .start(stepA)
        .next(decider).on("SPECIAL_CASE").to(stepB)
        .from(decider).on("NORMAL").to(stepC)
        .end()
        .build()
}

六、分流处理

并行执行多个流程

使用 split() 实现并行执行:

kotlin
@Bean
fun flow1(step1: Step, step2: Step): Flow {
    return FlowBuilder<SimpleFlow>("flow1")
        .start(step1)
        .next(step2)
        .build()
}

@Bean
fun flow2(step3: Step): Flow {
    return FlowBuilder<SimpleFlow>("flow2")
        .start(step3)
        .build()
}

@Bean
fun splitJob(
    jobRepository: JobRepository,
    flow1: Flow,
    flow2: Flow,
    finalStep: Step
): Job {
    return JobBuilder("splitJob", jobRepository)
        .start(flow1)
        .split(SimpleAsyncTaskExecutor())
        .add(flow2)
        .next(finalStep)
        .end()
        .build()
}

CAUTION

并行流程需要谨慎设计资源竞争和事务边界问题!

七、外部化流程定义

1. 流程复用

将通用流程定义为独立Bean:

kotlin
@Bean
fun commonFlow(stepA: Step, stepB: Step): Flow {
    return FlowBuilder<SimpleFlow>("commonFlow")
        .start(stepA)
        .next(stepB)
        .build()
}

@Bean
fun jobWithExternalFlow(
    jobRepository: JobRepository,
    commonFlow: Flow,
    stepC: Step
): Job {
    return JobBuilder("externalFlowJob", jobRepository)
        .start(commonFlow)
        .next(stepC)
        .end()
        .build()
}

2. 作业嵌套(JobStep)

使用 JobStep 实现作业间的依赖:

kotlin
@Bean
fun parentJob(jobRepository: JobRepository, childJobStep: Step): Job {
    return JobBuilder("parentJob", jobRepository)
        .start(childJobStep)
        .build()
}

@Bean
fun childJobStep(
    jobRepository: JobRepository,
    jobLauncher: JobLauncher,
    childJob: Job
): Step {
    return StepBuilder("childJobStep", jobRepository)
        .job(childJob)
        .launcher(jobLauncher)
        .parametersExtractor { stepExecution ->
            mapOf(
                "parentId" to stepExecution.jobExecution.id.toString()
            )
        }
        .build()
}

最佳实践

  1. 保持步骤单一职责:每个步骤只做一件事
  2. 合理设置事务边界:避免长事务影响性能
  3. 使用监听器增强控制:StepExecutionListener提供额外控制点
  4. 充分测试流程分支:确保所有分支逻辑都被覆盖

总结

通过本教程,您已经掌握了 Spring Batch 中控制步骤流程的核心技术:

顺序流 - 线性执行步骤链
条件流 - 基于状态码分支路由
停止控制 - 灵活结束/失败/暂停作业
编程决策 - 复杂逻辑的定制化处理
分流处理 - 并行执行提高效率
流程复用 - 模块化设计提高可维护性

⚡️ 立即应用这些技术,让您的批处理作业更加智能和健壮!