Skip to content

Spring 任务执行与调度:让你的应用"分身有术" 🚀

引言:为什么需要任务执行与调度?

想象一下这样的场景:你正在开发一个电商系统,需要处理用户订单、发送邮件通知、定期清理过期数据、生成报表等任务。如果所有这些操作都在主线程中同步执行,用户体验会变得非常糟糕——点击下单按钮后要等待很久才能看到响应。

IMPORTANT

Spring 的任务执行与调度框架解决了现代应用中的两个核心问题:

  • 异步执行:让耗时操作不阻塞主线程
  • 定时调度:让系统能够自动执行周期性任务

这就是 Spring Framework 提供 TaskExecutorTaskScheduler 抽象的原因——它们让你的应用能够"分身有术",同时处理多个任务。

1. TaskExecutor:异步执行的艺术 🎭

1.1 核心概念与设计哲学

TaskExecutor 是 Spring 对 JDK java.util.concurrent.Executor 接口的抽象封装。它的设计哲学很简单:统一异步执行的接口,隐藏底层实现细节

1.2 TaskExecutor 类型详解

Spring 提供了多种 TaskExecutor 实现,每种都有其特定的使用场景:

kotlin
@Configuration
class TaskConfig {
    
    @Bean
    fun taskExecutor(): ThreadPoolTaskExecutor {
        return ThreadPoolTaskExecutor().apply {
            corePoolSize = 5        // 核心线程数
            maxPoolSize = 10        // 最大线程数
            queueCapacity = 25      // 队列容量
            threadNamePrefix = "async-task-"
            setRejectedExecutionHandler(ThreadPoolExecutor.CallerRunsPolicy()) 
            initialize()
        }
    }
}
kotlin
@Bean
fun simpleTaskExecutor(): SimpleAsyncTaskExecutor {
    return SimpleAsyncTaskExecutor().apply {
        concurrencyLimit = 10   // 并发限制
        threadNamePrefix = "simple-async-"
        // 启用虚拟线程(JDK 21+)
        setVirtualThreads(true) 
    }
}

TIP

选择建议

  • 生产环境推荐使用 ThreadPoolTaskExecutor,它提供了完整的线程池管理
  • 测试环境可以使用 SyncTaskExecutor,所有任务同步执行,便于调试
  • JDK 21+ 环境可以考虑启用虚拟线程的 SimpleAsyncTaskExecutor

1.3 实际应用示例

让我们看一个电商订单处理的实际例子:

kotlin
@Service
class OrderService(
    private val taskExecutor: TaskExecutor,
    private val emailService: EmailService,
    private val inventoryService: InventoryService
) {
    
    fun processOrder(order: Order): String {
        // 主线程:快速响应用户
        val orderId = saveOrder(order)
        
        // 异步任务:耗时操作不阻塞响应
        taskExecutor.execute {
            try {
                // 发送确认邮件
                emailService.sendOrderConfirmation(order) 
                
                // 更新库存
                inventoryService.updateStock(order.items) 
                
                // 记录日志
                logger.info("订单 $orderId 处理完成")
            } catch (e: Exception) {
                logger.error("订单 $orderId 处理失败", e) 
            }
        }
        
        return orderId // 立即返回,不等待异步任务完成
    }
    
    private fun saveOrder(order: Order): String {
        // 保存订单到数据库
        return UUID.randomUUID().toString()
    }
}

1.4 任务装饰器:增强功能的利器

Spring 提供了 TaskDecorator 机制,让你可以在任务执行前后添加自定义逻辑:

kotlin
class LoggingTaskDecorator : TaskDecorator {
    private val logger = LoggerFactory.getLogger(LoggingTaskDecorator::class.java)
    
    override fun decorate(runnable: Runnable): Runnable {
        return Runnable {
            val startTime = System.currentTimeMillis()
            val threadName = Thread.currentThread().name
            
            logger.debug("任务开始执行 - 线程: $threadName") 
            
            try {
                runnable.run()
                logger.debug("任务执行成功 - 耗时: ${System.currentTimeMillis() - startTime}ms") 
            } catch (e: Exception) {
                logger.error("任务执行失败 - 线程: $threadName", e) 
                throw e
            }
        }
    }
}

@Configuration
class TaskConfig {
    @Bean
    fun decoratedTaskExecutor(): ThreadPoolTaskExecutor {
        return ThreadPoolTaskExecutor().apply {
            corePoolSize = 5
            maxPoolSize = 10
            setTaskDecorator(LoggingTaskDecorator()) 
        }
    }
}

2. TaskScheduler:定时任务的指挥家 ⏰

2.1 设计理念与核心价值

TaskScheduler 解决的是"什么时候执行"的问题。它的设计理念是:提供灵活的调度策略,支持从简单的延时执行到复杂的 Cron 表达式

2.2 调度策略详解

Spring 提供了多种调度策略:

kotlin
@Service
class DataCleanupService(private val taskScheduler: TaskScheduler) {
    
    fun startCleanupTask() {
        // 每次执行完成后,等待5秒再执行下一次
        taskScheduler.scheduleWithFixedDelay(
            { cleanupExpiredData() },
            Duration.ofSeconds(5) 
        )
    }
    
    private fun cleanupExpiredData() {
        logger.info("开始清理过期数据...")
        // 清理逻辑
        Thread.sleep(2000) // 模拟耗时操作
        logger.info("过期数据清理完成")
    }
}
kotlin
@Service
class ReportService(private val taskScheduler: TaskScheduler) {
    
    fun startReportGeneration() {
        // 每10秒执行一次,不管上次是否完成
        taskScheduler.scheduleAtFixedRate(
            { generateReport() },
            Duration.ofSeconds(10) 
        )
    }
    
    private fun generateReport() {
        logger.info("生成报表中...")
        // 报表生成逻辑
    }
}
kotlin
@Service
class BackupService(private val taskScheduler: TaskScheduler) {
    
    fun startBackupTask() {
        // 每天凌晨2点执行备份
        taskScheduler.schedule(
            { performBackup() },
            CronTrigger("0 0 2 * * ?") 
        )
    }
    
    private fun performBackup() {
        logger.info("开始执行数据备份...")
        // 备份逻辑
    }
}

2.3 自定义触发器

对于复杂的调度需求,你可以实现自定义触发器:

kotlin
class BusinessHoursTrigger : Trigger {
    
    override fun nextExecution(triggerContext: TriggerContext): Instant? {
        val now = Instant.now()
        val zonedNow = now.atZone(ZoneId.systemDefault())
        
        // 只在工作日的9-17点执行
        val nextExecution = if (isBusinessHours(zonedNow)) {
            // 如果当前是工作时间,5分钟后执行
            now.plus(Duration.ofMinutes(5)) 
        } else {
            // 如果不是工作时间,计算下一个工作日的9点
            calculateNextBusinessDay(zonedNow) 
        }
        
        return nextExecution
    }
    
    private fun isBusinessHours(time: ZonedDateTime): Boolean {
        val dayOfWeek = time.dayOfWeek
        val hour = time.hour
        return dayOfWeek in DayOfWeek.MONDAY..DayOfWeek.FRIDAY && hour in 9..16
    }
    
    private fun calculateNextBusinessDay(time: ZonedDateTime): Instant {
        var next = time.plusDays(1).withHour(9).withMinute(0).withSecond(0)
        while (next.dayOfWeek == DayOfWeek.SATURDAY || next.dayOfWeek == DayOfWeek.SUNDAY) {
            next = next.plusDays(1)
        }
        return next.toInstant()
    }
}

3. 注解驱动:简化配置的魔法 ✨

3.1 启用调度支持

首先,需要在配置类上启用调度支持:

kotlin
@Configuration
@EnableScheduling  // 启用 @Scheduled 支持
@EnableAsync      // 启用 @Async 支持
class SchedulingConfig : SchedulingConfigurer, AsyncConfigurer {
    
    override fun configureTasks(taskRegistrar: ScheduledTaskRegistrar) {
        taskRegistrar.setScheduler(taskScheduler())
    }
    
    override fun getAsyncExecutor(): Executor {
        return taskExecutor()
    }
    
    @Bean
    fun taskScheduler(): TaskScheduler {
        return ThreadPoolTaskScheduler().apply {
            poolSize = 10
            threadNamePrefix = "scheduled-task-"
            initialize()
        }
    }
    
    @Bean
    fun taskExecutor(): TaskExecutor {
        return ThreadPoolTaskExecutor().apply {
            corePoolSize = 5
            maxPoolSize = 20
            queueCapacity = 100
            threadNamePrefix = "async-task-"
            initialize()
        }
    }
}

3.2 @Scheduled 注解详解

@Scheduled 注解提供了多种调度方式:

kotlin
@Component
class ScheduledTasks {
    private val logger = LoggerFactory.getLogger(ScheduledTasks::class.java)
    
    // 固定延迟:上次执行完成后等待5秒
    @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
    fun taskWithFixedDelay() {
        logger.info("固定延迟任务执行 - ${LocalDateTime.now()}")
        Thread.sleep(2000) // 模拟耗时操作
    }
    
    // 固定频率:每10秒执行一次
    @Scheduled(fixedRate = 10, timeUnit = TimeUnit.SECONDS)
    fun taskWithFixedRate() {
        logger.info("固定频率任务执行 - ${LocalDateTime.now()}")
    }
    
    // 初始延迟:启动后等待30秒再开始执行
    @Scheduled(initialDelay = 30, fixedRate = 60, timeUnit = TimeUnit.SECONDS)
    fun taskWithInitialDelay() {
        logger.info("带初始延迟的任务执行 - ${LocalDateTime.now()}")
    }
    
    // Cron 表达式:每个工作日的9点执行
    @Scheduled(cron = "0 0 9 * * MON-FRI", zone = "Asia/Shanghai")
    fun taskWithCronExpression() {
        logger.info("Cron 任务执行 - ${LocalDateTime.now()}")
    }
    
    // 使用宏:每小时执行
    @Scheduled(cron = "@hourly")
    fun hourlyTask() {
        logger.info("每小时任务执行 - ${LocalDateTime.now()}")
    }
}

WARNING

注意事项

  • @Scheduled 方法必须无参数且返回 void
  • 默认情况下,所有定时任务在同一个线程中执行,长时间运行的任务会阻塞其他任务
  • 可以通过配置多个线程来避免阻塞问题

3.3 @Async 注解:异步执行的简单方式

kotlin
@Service
class UserService {
    private val logger = LoggerFactory.getLogger(UserService::class.java)
    
    // 简单异步方法
    @Async
    fun sendWelcomeEmail(userId: String) {
        logger.info("开始发送欢迎邮件给用户: $userId")
        Thread.sleep(3000) // 模拟邮件发送耗时
        logger.info("欢迎邮件发送完成: $userId")
    }
    
    // 带返回值的异步方法
    @Async
    fun calculateUserScore(userId: String): CompletableFuture<Int> {
        logger.info("开始计算用户积分: $userId")
        Thread.sleep(2000) // 模拟复杂计算
        val score = (1..100).random()
        logger.info("用户积分计算完成: $userId, 积分: $score")
        return CompletableFuture.completedFuture(score) 
    }
    
    // 指定执行器
    @Async("taskExecutor")
    fun processUserData(userId: String) {
        logger.info("使用指定执行器处理用户数据: $userId")
        // 处理逻辑
    }
}

3.4 异常处理

对于异步方法的异常处理,Spring 提供了专门的机制:

kotlin
@Component
class AsyncExceptionHandler : AsyncUncaughtExceptionHandler {
    private val logger = LoggerFactory.getLogger(AsyncExceptionHandler::class.java)
    
    override fun handleUncaughtException(ex: Throwable, method: Method, vararg params: Any?) {
        logger.error("异步方法执行异常 - 方法: ${method.name}, 参数: ${params.contentToString()}", ex) 
        
        // 可以在这里添加告警、重试等逻辑
        when (ex) {
            is IllegalArgumentException -> {
                logger.warn("参数异常,跳过处理") 
            }
            is RuntimeException -> {
                logger.error("运行时异常,需要人工介入") 
                // 发送告警
            }
        }
    }
}

@Configuration
@EnableAsync
class AsyncConfig : AsyncConfigurer {
    
    override fun getAsyncUncaughtExceptionHandler(): AsyncUncaughtExceptionHandler {
        return AsyncExceptionHandler() 
    }
}

4. Cron 表达式:时间的语言 📅

4.1 Cron 表达式结构

Spring 的 Cron 表达式由6个字段组成:

 ┌───────────── 秒 (0-59)
 │ ┌───────────── 分钟 (0-59)
 │ │ ┌───────────── 小时 (0-23)
 │ │ │ ┌───────────── 日 (1-31)
 │ │ │ │ ┌───────────── 月 (1-12 或 JAN-DEC)
 │ │ │ │ │ ┌───────────── 星期 (0-7,0和7都表示周日,或 MON-SUN)
 │ │ │ │ │ │
 * * * * * *

4.2 实用的 Cron 表达式示例

常用 Cron 表达式

表达式说明使用场景
0 0 2 * * ?每天凌晨2点数据备份
0 */15 * * * ?每15分钟健康检查
0 0 9-17 * * MON-FRI工作日9-17点每小时业务监控
0 0 0 L * ?每月最后一天午夜月度报表
0 0 0 ? * 2L每月最后一个周一午夜特殊维护

4.3 业务场景中的 Cron 应用

kotlin
@Component
class BusinessScheduledTasks {
    
    // 每个工作日早上9点发送日报
    @Scheduled(cron = "0 0 9 * * MON-FRI")
    fun sendDailyReport() {
        // 发送日报逻辑
    }
    
    // 每月1号凌晨1点生成月度统计
    @Scheduled(cron = "0 0 1 1 * ?")
    fun generateMonthlyStatistics() {
        // 月度统计逻辑
    }
    
    // 每周日凌晨3点清理日志
    @Scheduled(cron = "0 0 3 ? * SUN")
    fun cleanupLogs() {
        // 日志清理逻辑
    }
    
    // 每天每隔2小时检查系统状态
    @Scheduled(cron = "0 0 */2 * * ?")
    fun systemHealthCheck() {
        // 系统健康检查逻辑
    }
}

5. 最佳实践与性能优化 🚀

5.1 线程池配置最佳实践

kotlin
@Configuration
class OptimizedTaskConfig {
    
    @Bean("ioTaskExecutor")
    fun ioTaskExecutor(): ThreadPoolTaskExecutor {
        return ThreadPoolTaskExecutor().apply {
            // IO密集型任务:线程数可以设置较大
            corePoolSize = Runtime.getRuntime().availableProcessors() * 2
            maxPoolSize = Runtime.getRuntime().availableProcessors() * 4
            queueCapacity = 200
            threadNamePrefix = "io-task-"
            
            // 优雅关闭配置
            setWaitForTasksToCompleteOnShutdown(true) 
            setAwaitTerminationSeconds(60)            
            
            // 拒绝策略:调用者运行
            setRejectedExecutionHandler(ThreadPoolExecutor.CallerRunsPolicy())
        }
    }
    
    @Bean("cpuTaskExecutor")
    fun cpuTaskExecutor(): ThreadPoolTaskExecutor {
        return ThreadPoolTaskExecutor().apply {
            // CPU密集型任务:线程数接近CPU核心数
            val processors = Runtime.getRuntime().availableProcessors()
            corePoolSize = processors     
            maxPoolSize = processors + 1
            queueCapacity = 100
            threadNamePrefix = "cpu-task-"
        }
    }
}

5.2 监控与指标

kotlin
@Component
class TaskExecutorMonitor {
    private val meterRegistry = Metrics.globalRegistry
    
    @EventListener
    fun handleTaskExecutorMetrics(taskExecutor: ThreadPoolTaskExecutor) {
        // 注册线程池指标
        Gauge.builder("thread.pool.active")
            .description("活跃线程数")
            .register(meterRegistry) { taskExecutor.activeCount.toDouble() }
        
        Gauge.builder("thread.pool.queue.size")
            .description("队列大小")
            .register(meterRegistry) { taskExecutor.threadPoolExecutor.queue.size.toDouble() }
        
        Gauge.builder("thread.pool.completed.tasks")
            .description("已完成任务数")
            .register(meterRegistry) { taskExecutor.threadPoolExecutor.completedTaskCount.toDouble() }
    }
}

5.3 错误处理与重试机制

kotlin
@Service
class ResilientTaskService {
    private val logger = LoggerFactory.getLogger(ResilientTaskService::class.java)
    
    @Async
    @Retryable(
        value = [Exception::class],
        maxAttempts = 3,
        backoff = Backoff(delay = 1000, multiplier = 2.0) 
    )
    fun processWithRetry(data: String) {
        try {
            // 可能失败的业务逻辑
            processData(data)
        } catch (e: Exception) {
            logger.warn("任务执行失败,将进行重试: ${e.message}") 
            throw e
        }
    }
    
    @Recover
    fun recover(ex: Exception, data: String) {
        logger.error("任务最终失败,执行恢复逻辑: $data", ex) 
        // 恢复逻辑,如发送告警、记录失败日志等
    }
    
    private fun processData(data: String) {
        // 模拟可能失败的操作
        if (Math.random() < 0.7) {
            throw RuntimeException("模拟处理失败")
        }
        logger.info("数据处理成功: $data")
    }
}

6. 实战案例:构建一个完整的任务调度系统 💼

让我们构建一个电商系统的订单处理和数据分析任务调度系统:

完整的电商任务调度系统实现
kotlin
// 1. 配置类
@Configuration
@EnableScheduling
@EnableAsync
class ECommerceTaskConfig : SchedulingConfigurer, AsyncConfigurer {
    
    override fun configureTasks(taskRegistrar: ScheduledTaskRegistrar) {
        taskRegistrar.setScheduler(taskScheduler())
    }
    
    override fun getAsyncExecutor(): Executor = asyncTaskExecutor()
    
    @Bean
    fun taskScheduler(): TaskScheduler {
        return ThreadPoolTaskScheduler().apply {
            poolSize = 10
            threadNamePrefix = "scheduled-"
            setWaitForTasksToCompleteOnShutdown(true)
            setAwaitTerminationSeconds(60)
        }
    }
    
    @Bean
    fun asyncTaskExecutor(): TaskExecutor {
        return ThreadPoolTaskExecutor().apply {
            corePoolSize = 10
            maxPoolSize = 50
            queueCapacity = 200
            threadNamePrefix = "async-"
            setTaskDecorator(LoggingTaskDecorator())
        }
    }
}

// 2. 订单处理服务
@Service
class OrderProcessingService(
    private val emailService: EmailService,
    private val inventoryService: InventoryService,
    private val paymentService: PaymentService
) {
    private val logger = LoggerFactory.getLogger(OrderProcessingService::class.java)
    
    @Async
    fun processOrderAsync(order: Order): CompletableFuture<Boolean> {
        return try {
            // 处理支付
            val paymentResult = paymentService.processPayment(order)
            if (!paymentResult.success) {
                throw PaymentException("支付失败: ${paymentResult.message}")
            }
            
            // 更新库存
            inventoryService.updateInventory(order.items)
            
            // 发送确认邮件
            emailService.sendOrderConfirmation(order)
            
            logger.info("订单处理完成: ${order.id}")
            CompletableFuture.completedFuture(true)
        } catch (e: Exception) {
            logger.error("订单处理失败: ${order.id}", e)
            CompletableFuture.completedFuture(false)
        }
    }
    
    // 每5分钟检查待处理订单
    @Scheduled(fixedRate = 5, timeUnit = TimeUnit.MINUTES)
    fun processPendingOrders() {
        val pendingOrders = findPendingOrders()
        logger.info("发现 ${pendingOrders.size} 个待处理订单")
        
        pendingOrders.forEach { order ->
            processOrderAsync(order)
        }
    }
    
    private fun findPendingOrders(): List<Order> {
        // 查询待处理订单的逻辑
        return emptyList()
    }
}

// 3. 数据分析服务
@Service
class DataAnalysisService {
    private val logger = LoggerFactory.getLogger(DataAnalysisService::class.java)
    
    // 每天凌晨2点生成销售报表
    @Scheduled(cron = "0 0 2 * * ?")
    fun generateDailySalesReport() {
        logger.info("开始生成日销售报表")
        
        try {
            val salesData = collectSalesData()
            val report = generateReport(salesData)
            saveReport(report)
            
            logger.info("日销售报表生成完成")
        } catch (e: Exception) {
            logger.error("日销售报表生成失败", e)
        }
    }
    
    // 每周一早上8点生成周报
    @Scheduled(cron = "0 0 8 ? * MON")
    fun generateWeeklyReport() {
        logger.info("开始生成周报")
        // 周报生成逻辑
    }
    
    // 每月1号生成月度分析
    @Scheduled(cron = "0 0 6 1 * ?")
    fun generateMonthlyAnalysis() {
        logger.info("开始生成月度分析")
        // 月度分析逻辑
    }
    
    private fun collectSalesData(): SalesData {
        // 收集销售数据
        return SalesData()
    }
    
    private fun generateReport(salesData: SalesData): Report {
        // 生成报表
        return Report()
    }
    
    private fun saveReport(report: Report) {
        // 保存报表
    }
}

// 4. 系统维护服务
@Service
class SystemMaintenanceService {
    private val logger = LoggerFactory.getLogger(SystemMaintenanceService::class.java)
    
    // 每天凌晨4点清理临时文件
    @Scheduled(cron = "0 0 4 * * ?")
    fun cleanupTempFiles() {
        logger.info("开始清理临时文件")
        // 清理逻辑
    }
    
    // 每周日凌晨3点备份数据库
    @Scheduled(cron = "0 0 3 ? * SUN")
    fun backupDatabase() {
        logger.info("开始数据库备份")
        // 备份逻辑
    }
    
    // 每小时检查系统健康状态
    @Scheduled(cron = "0 0 * * * ?")
    fun healthCheck() {
        logger.debug("执行系统健康检查")
        // 健康检查逻辑
    }
}

// 5. 数据类
data class Order(
    val id: String,
    val userId: String,
    val items: List<OrderItem>,
    val totalAmount: BigDecimal,
    val status: OrderStatus
)

data class OrderItem(
    val productId: String,
    val quantity: Int,
    val price: BigDecimal
)

enum class OrderStatus {
    PENDING, PROCESSING, COMPLETED, CANCELLED
}

data class SalesData(
    val totalSales: BigDecimal = BigDecimal.ZERO,
    val orderCount: Int = 0,
    val topProducts: List<String> = emptyList()
)

data class Report(
    val id: String = UUID.randomUUID().toString(),
    val type: String = "DAILY_SALES",
    val data: Map<String, Any> = emptyMap(),
    val