Appearance
Spring 任务执行与调度:让你的应用"分身有术" 🚀
引言:为什么需要任务执行与调度?
想象一下这样的场景:你正在开发一个电商系统,需要处理用户订单、发送邮件通知、定期清理过期数据、生成报表等任务。如果所有这些操作都在主线程中同步执行,用户体验会变得非常糟糕——点击下单按钮后要等待很久才能看到响应。
IMPORTANT
Spring 的任务执行与调度框架解决了现代应用中的两个核心问题:
- 异步执行:让耗时操作不阻塞主线程
- 定时调度:让系统能够自动执行周期性任务
这就是 Spring Framework 提供 TaskExecutor
和 TaskScheduler
抽象的原因——它们让你的应用能够"分身有术",同时处理多个任务。
1. TaskExecutor:异步执行的艺术 🎭
1.1 核心概念与设计哲学
TaskExecutor
是 Spring 对 JDK java.util.concurrent.Executor
接口的抽象封装。它的设计哲学很简单:统一异步执行的接口,隐藏底层实现细节。
1.2 TaskExecutor 类型详解
Spring 提供了多种 TaskExecutor
实现,每种都有其特定的使用场景:
kotlin
@Configuration
class TaskConfig {
@Bean
fun taskExecutor(): ThreadPoolTaskExecutor {
return ThreadPoolTaskExecutor().apply {
corePoolSize = 5 // 核心线程数
maxPoolSize = 10 // 最大线程数
queueCapacity = 25 // 队列容量
threadNamePrefix = "async-task-"
setRejectedExecutionHandler(ThreadPoolExecutor.CallerRunsPolicy())
initialize()
}
}
}
kotlin
@Bean
fun simpleTaskExecutor(): SimpleAsyncTaskExecutor {
return SimpleAsyncTaskExecutor().apply {
concurrencyLimit = 10 // 并发限制
threadNamePrefix = "simple-async-"
// 启用虚拟线程(JDK 21+)
setVirtualThreads(true)
}
}
TIP
选择建议:
- 生产环境推荐使用
ThreadPoolTaskExecutor
,它提供了完整的线程池管理 - 测试环境可以使用
SyncTaskExecutor
,所有任务同步执行,便于调试 - JDK 21+ 环境可以考虑启用虚拟线程的
SimpleAsyncTaskExecutor
1.3 实际应用示例
让我们看一个电商订单处理的实际例子:
kotlin
@Service
class OrderService(
private val taskExecutor: TaskExecutor,
private val emailService: EmailService,
private val inventoryService: InventoryService
) {
fun processOrder(order: Order): String {
// 主线程:快速响应用户
val orderId = saveOrder(order)
// 异步任务:耗时操作不阻塞响应
taskExecutor.execute {
try {
// 发送确认邮件
emailService.sendOrderConfirmation(order)
// 更新库存
inventoryService.updateStock(order.items)
// 记录日志
logger.info("订单 $orderId 处理完成")
} catch (e: Exception) {
logger.error("订单 $orderId 处理失败", e)
}
}
return orderId // 立即返回,不等待异步任务完成
}
private fun saveOrder(order: Order): String {
// 保存订单到数据库
return UUID.randomUUID().toString()
}
}
1.4 任务装饰器:增强功能的利器
Spring 提供了 TaskDecorator
机制,让你可以在任务执行前后添加自定义逻辑:
kotlin
class LoggingTaskDecorator : TaskDecorator {
private val logger = LoggerFactory.getLogger(LoggingTaskDecorator::class.java)
override fun decorate(runnable: Runnable): Runnable {
return Runnable {
val startTime = System.currentTimeMillis()
val threadName = Thread.currentThread().name
logger.debug("任务开始执行 - 线程: $threadName")
try {
runnable.run()
logger.debug("任务执行成功 - 耗时: ${System.currentTimeMillis() - startTime}ms")
} catch (e: Exception) {
logger.error("任务执行失败 - 线程: $threadName", e)
throw e
}
}
}
}
@Configuration
class TaskConfig {
@Bean
fun decoratedTaskExecutor(): ThreadPoolTaskExecutor {
return ThreadPoolTaskExecutor().apply {
corePoolSize = 5
maxPoolSize = 10
setTaskDecorator(LoggingTaskDecorator())
}
}
}
2. TaskScheduler:定时任务的指挥家 ⏰
2.1 设计理念与核心价值
TaskScheduler
解决的是"什么时候执行"的问题。它的设计理念是:提供灵活的调度策略,支持从简单的延时执行到复杂的 Cron 表达式。
2.2 调度策略详解
Spring 提供了多种调度策略:
kotlin
@Service
class DataCleanupService(private val taskScheduler: TaskScheduler) {
fun startCleanupTask() {
// 每次执行完成后,等待5秒再执行下一次
taskScheduler.scheduleWithFixedDelay(
{ cleanupExpiredData() },
Duration.ofSeconds(5)
)
}
private fun cleanupExpiredData() {
logger.info("开始清理过期数据...")
// 清理逻辑
Thread.sleep(2000) // 模拟耗时操作
logger.info("过期数据清理完成")
}
}
kotlin
@Service
class ReportService(private val taskScheduler: TaskScheduler) {
fun startReportGeneration() {
// 每10秒执行一次,不管上次是否完成
taskScheduler.scheduleAtFixedRate(
{ generateReport() },
Duration.ofSeconds(10)
)
}
private fun generateReport() {
logger.info("生成报表中...")
// 报表生成逻辑
}
}
kotlin
@Service
class BackupService(private val taskScheduler: TaskScheduler) {
fun startBackupTask() {
// 每天凌晨2点执行备份
taskScheduler.schedule(
{ performBackup() },
CronTrigger("0 0 2 * * ?")
)
}
private fun performBackup() {
logger.info("开始执行数据备份...")
// 备份逻辑
}
}
2.3 自定义触发器
对于复杂的调度需求,你可以实现自定义触发器:
kotlin
class BusinessHoursTrigger : Trigger {
override fun nextExecution(triggerContext: TriggerContext): Instant? {
val now = Instant.now()
val zonedNow = now.atZone(ZoneId.systemDefault())
// 只在工作日的9-17点执行
val nextExecution = if (isBusinessHours(zonedNow)) {
// 如果当前是工作时间,5分钟后执行
now.plus(Duration.ofMinutes(5))
} else {
// 如果不是工作时间,计算下一个工作日的9点
calculateNextBusinessDay(zonedNow)
}
return nextExecution
}
private fun isBusinessHours(time: ZonedDateTime): Boolean {
val dayOfWeek = time.dayOfWeek
val hour = time.hour
return dayOfWeek in DayOfWeek.MONDAY..DayOfWeek.FRIDAY && hour in 9..16
}
private fun calculateNextBusinessDay(time: ZonedDateTime): Instant {
var next = time.plusDays(1).withHour(9).withMinute(0).withSecond(0)
while (next.dayOfWeek == DayOfWeek.SATURDAY || next.dayOfWeek == DayOfWeek.SUNDAY) {
next = next.plusDays(1)
}
return next.toInstant()
}
}
3. 注解驱动:简化配置的魔法 ✨
3.1 启用调度支持
首先,需要在配置类上启用调度支持:
kotlin
@Configuration
@EnableScheduling // 启用 @Scheduled 支持
@EnableAsync // 启用 @Async 支持
class SchedulingConfig : SchedulingConfigurer, AsyncConfigurer {
override fun configureTasks(taskRegistrar: ScheduledTaskRegistrar) {
taskRegistrar.setScheduler(taskScheduler())
}
override fun getAsyncExecutor(): Executor {
return taskExecutor()
}
@Bean
fun taskScheduler(): TaskScheduler {
return ThreadPoolTaskScheduler().apply {
poolSize = 10
threadNamePrefix = "scheduled-task-"
initialize()
}
}
@Bean
fun taskExecutor(): TaskExecutor {
return ThreadPoolTaskExecutor().apply {
corePoolSize = 5
maxPoolSize = 20
queueCapacity = 100
threadNamePrefix = "async-task-"
initialize()
}
}
}
3.2 @Scheduled 注解详解
@Scheduled
注解提供了多种调度方式:
kotlin
@Component
class ScheduledTasks {
private val logger = LoggerFactory.getLogger(ScheduledTasks::class.java)
// 固定延迟:上次执行完成后等待5秒
@Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
fun taskWithFixedDelay() {
logger.info("固定延迟任务执行 - ${LocalDateTime.now()}")
Thread.sleep(2000) // 模拟耗时操作
}
// 固定频率:每10秒执行一次
@Scheduled(fixedRate = 10, timeUnit = TimeUnit.SECONDS)
fun taskWithFixedRate() {
logger.info("固定频率任务执行 - ${LocalDateTime.now()}")
}
// 初始延迟:启动后等待30秒再开始执行
@Scheduled(initialDelay = 30, fixedRate = 60, timeUnit = TimeUnit.SECONDS)
fun taskWithInitialDelay() {
logger.info("带初始延迟的任务执行 - ${LocalDateTime.now()}")
}
// Cron 表达式:每个工作日的9点执行
@Scheduled(cron = "0 0 9 * * MON-FRI", zone = "Asia/Shanghai")
fun taskWithCronExpression() {
logger.info("Cron 任务执行 - ${LocalDateTime.now()}")
}
// 使用宏:每小时执行
@Scheduled(cron = "@hourly")
fun hourlyTask() {
logger.info("每小时任务执行 - ${LocalDateTime.now()}")
}
}
WARNING
注意事项:
@Scheduled
方法必须无参数且返回 void- 默认情况下,所有定时任务在同一个线程中执行,长时间运行的任务会阻塞其他任务
- 可以通过配置多个线程来避免阻塞问题
3.3 @Async 注解:异步执行的简单方式
kotlin
@Service
class UserService {
private val logger = LoggerFactory.getLogger(UserService::class.java)
// 简单异步方法
@Async
fun sendWelcomeEmail(userId: String) {
logger.info("开始发送欢迎邮件给用户: $userId")
Thread.sleep(3000) // 模拟邮件发送耗时
logger.info("欢迎邮件发送完成: $userId")
}
// 带返回值的异步方法
@Async
fun calculateUserScore(userId: String): CompletableFuture<Int> {
logger.info("开始计算用户积分: $userId")
Thread.sleep(2000) // 模拟复杂计算
val score = (1..100).random()
logger.info("用户积分计算完成: $userId, 积分: $score")
return CompletableFuture.completedFuture(score)
}
// 指定执行器
@Async("taskExecutor")
fun processUserData(userId: String) {
logger.info("使用指定执行器处理用户数据: $userId")
// 处理逻辑
}
}
3.4 异常处理
对于异步方法的异常处理,Spring 提供了专门的机制:
kotlin
@Component
class AsyncExceptionHandler : AsyncUncaughtExceptionHandler {
private val logger = LoggerFactory.getLogger(AsyncExceptionHandler::class.java)
override fun handleUncaughtException(ex: Throwable, method: Method, vararg params: Any?) {
logger.error("异步方法执行异常 - 方法: ${method.name}, 参数: ${params.contentToString()}", ex)
// 可以在这里添加告警、重试等逻辑
when (ex) {
is IllegalArgumentException -> {
logger.warn("参数异常,跳过处理")
}
is RuntimeException -> {
logger.error("运行时异常,需要人工介入")
// 发送告警
}
}
}
}
@Configuration
@EnableAsync
class AsyncConfig : AsyncConfigurer {
override fun getAsyncUncaughtExceptionHandler(): AsyncUncaughtExceptionHandler {
return AsyncExceptionHandler()
}
}
4. Cron 表达式:时间的语言 📅
4.1 Cron 表达式结构
Spring 的 Cron 表达式由6个字段组成:
┌───────────── 秒 (0-59)
│ ┌───────────── 分钟 (0-59)
│ │ ┌───────────── 小时 (0-23)
│ │ │ ┌───────────── 日 (1-31)
│ │ │ │ ┌───────────── 月 (1-12 或 JAN-DEC)
│ │ │ │ │ ┌───────────── 星期 (0-7,0和7都表示周日,或 MON-SUN)
│ │ │ │ │ │
* * * * * *
4.2 实用的 Cron 表达式示例
常用 Cron 表达式
表达式 | 说明 | 使用场景 |
---|---|---|
0 0 2 * * ? | 每天凌晨2点 | 数据备份 |
0 */15 * * * ? | 每15分钟 | 健康检查 |
0 0 9-17 * * MON-FRI | 工作日9-17点每小时 | 业务监控 |
0 0 0 L * ? | 每月最后一天午夜 | 月度报表 |
0 0 0 ? * 2L | 每月最后一个周一午夜 | 特殊维护 |
4.3 业务场景中的 Cron 应用
kotlin
@Component
class BusinessScheduledTasks {
// 每个工作日早上9点发送日报
@Scheduled(cron = "0 0 9 * * MON-FRI")
fun sendDailyReport() {
// 发送日报逻辑
}
// 每月1号凌晨1点生成月度统计
@Scheduled(cron = "0 0 1 1 * ?")
fun generateMonthlyStatistics() {
// 月度统计逻辑
}
// 每周日凌晨3点清理日志
@Scheduled(cron = "0 0 3 ? * SUN")
fun cleanupLogs() {
// 日志清理逻辑
}
// 每天每隔2小时检查系统状态
@Scheduled(cron = "0 0 */2 * * ?")
fun systemHealthCheck() {
// 系统健康检查逻辑
}
}
5. 最佳实践与性能优化 🚀
5.1 线程池配置最佳实践
kotlin
@Configuration
class OptimizedTaskConfig {
@Bean("ioTaskExecutor")
fun ioTaskExecutor(): ThreadPoolTaskExecutor {
return ThreadPoolTaskExecutor().apply {
// IO密集型任务:线程数可以设置较大
corePoolSize = Runtime.getRuntime().availableProcessors() * 2
maxPoolSize = Runtime.getRuntime().availableProcessors() * 4
queueCapacity = 200
threadNamePrefix = "io-task-"
// 优雅关闭配置
setWaitForTasksToCompleteOnShutdown(true)
setAwaitTerminationSeconds(60)
// 拒绝策略:调用者运行
setRejectedExecutionHandler(ThreadPoolExecutor.CallerRunsPolicy())
}
}
@Bean("cpuTaskExecutor")
fun cpuTaskExecutor(): ThreadPoolTaskExecutor {
return ThreadPoolTaskExecutor().apply {
// CPU密集型任务:线程数接近CPU核心数
val processors = Runtime.getRuntime().availableProcessors()
corePoolSize = processors
maxPoolSize = processors + 1
queueCapacity = 100
threadNamePrefix = "cpu-task-"
}
}
}
5.2 监控与指标
kotlin
@Component
class TaskExecutorMonitor {
private val meterRegistry = Metrics.globalRegistry
@EventListener
fun handleTaskExecutorMetrics(taskExecutor: ThreadPoolTaskExecutor) {
// 注册线程池指标
Gauge.builder("thread.pool.active")
.description("活跃线程数")
.register(meterRegistry) { taskExecutor.activeCount.toDouble() }
Gauge.builder("thread.pool.queue.size")
.description("队列大小")
.register(meterRegistry) { taskExecutor.threadPoolExecutor.queue.size.toDouble() }
Gauge.builder("thread.pool.completed.tasks")
.description("已完成任务数")
.register(meterRegistry) { taskExecutor.threadPoolExecutor.completedTaskCount.toDouble() }
}
}
5.3 错误处理与重试机制
kotlin
@Service
class ResilientTaskService {
private val logger = LoggerFactory.getLogger(ResilientTaskService::class.java)
@Async
@Retryable(
value = [Exception::class],
maxAttempts = 3,
backoff = Backoff(delay = 1000, multiplier = 2.0)
)
fun processWithRetry(data: String) {
try {
// 可能失败的业务逻辑
processData(data)
} catch (e: Exception) {
logger.warn("任务执行失败,将进行重试: ${e.message}")
throw e
}
}
@Recover
fun recover(ex: Exception, data: String) {
logger.error("任务最终失败,执行恢复逻辑: $data", ex)
// 恢复逻辑,如发送告警、记录失败日志等
}
private fun processData(data: String) {
// 模拟可能失败的操作
if (Math.random() < 0.7) {
throw RuntimeException("模拟处理失败")
}
logger.info("数据处理成功: $data")
}
}
6. 实战案例:构建一个完整的任务调度系统 💼
让我们构建一个电商系统的订单处理和数据分析任务调度系统:
完整的电商任务调度系统实现
kotlin
// 1. 配置类
@Configuration
@EnableScheduling
@EnableAsync
class ECommerceTaskConfig : SchedulingConfigurer, AsyncConfigurer {
override fun configureTasks(taskRegistrar: ScheduledTaskRegistrar) {
taskRegistrar.setScheduler(taskScheduler())
}
override fun getAsyncExecutor(): Executor = asyncTaskExecutor()
@Bean
fun taskScheduler(): TaskScheduler {
return ThreadPoolTaskScheduler().apply {
poolSize = 10
threadNamePrefix = "scheduled-"
setWaitForTasksToCompleteOnShutdown(true)
setAwaitTerminationSeconds(60)
}
}
@Bean
fun asyncTaskExecutor(): TaskExecutor {
return ThreadPoolTaskExecutor().apply {
corePoolSize = 10
maxPoolSize = 50
queueCapacity = 200
threadNamePrefix = "async-"
setTaskDecorator(LoggingTaskDecorator())
}
}
}
// 2. 订单处理服务
@Service
class OrderProcessingService(
private val emailService: EmailService,
private val inventoryService: InventoryService,
private val paymentService: PaymentService
) {
private val logger = LoggerFactory.getLogger(OrderProcessingService::class.java)
@Async
fun processOrderAsync(order: Order): CompletableFuture<Boolean> {
return try {
// 处理支付
val paymentResult = paymentService.processPayment(order)
if (!paymentResult.success) {
throw PaymentException("支付失败: ${paymentResult.message}")
}
// 更新库存
inventoryService.updateInventory(order.items)
// 发送确认邮件
emailService.sendOrderConfirmation(order)
logger.info("订单处理完成: ${order.id}")
CompletableFuture.completedFuture(true)
} catch (e: Exception) {
logger.error("订单处理失败: ${order.id}", e)
CompletableFuture.completedFuture(false)
}
}
// 每5分钟检查待处理订单
@Scheduled(fixedRate = 5, timeUnit = TimeUnit.MINUTES)
fun processPendingOrders() {
val pendingOrders = findPendingOrders()
logger.info("发现 ${pendingOrders.size} 个待处理订单")
pendingOrders.forEach { order ->
processOrderAsync(order)
}
}
private fun findPendingOrders(): List<Order> {
// 查询待处理订单的逻辑
return emptyList()
}
}
// 3. 数据分析服务
@Service
class DataAnalysisService {
private val logger = LoggerFactory.getLogger(DataAnalysisService::class.java)
// 每天凌晨2点生成销售报表
@Scheduled(cron = "0 0 2 * * ?")
fun generateDailySalesReport() {
logger.info("开始生成日销售报表")
try {
val salesData = collectSalesData()
val report = generateReport(salesData)
saveReport(report)
logger.info("日销售报表生成完成")
} catch (e: Exception) {
logger.error("日销售报表生成失败", e)
}
}
// 每周一早上8点生成周报
@Scheduled(cron = "0 0 8 ? * MON")
fun generateWeeklyReport() {
logger.info("开始生成周报")
// 周报生成逻辑
}
// 每月1号生成月度分析
@Scheduled(cron = "0 0 6 1 * ?")
fun generateMonthlyAnalysis() {
logger.info("开始生成月度分析")
// 月度分析逻辑
}
private fun collectSalesData(): SalesData {
// 收集销售数据
return SalesData()
}
private fun generateReport(salesData: SalesData): Report {
// 生成报表
return Report()
}
private fun saveReport(report: Report) {
// 保存报表
}
}
// 4. 系统维护服务
@Service
class SystemMaintenanceService {
private val logger = LoggerFactory.getLogger(SystemMaintenanceService::class.java)
// 每天凌晨4点清理临时文件
@Scheduled(cron = "0 0 4 * * ?")
fun cleanupTempFiles() {
logger.info("开始清理临时文件")
// 清理逻辑
}
// 每周日凌晨3点备份数据库
@Scheduled(cron = "0 0 3 ? * SUN")
fun backupDatabase() {
logger.info("开始数据库备份")
// 备份逻辑
}
// 每小时检查系统健康状态
@Scheduled(cron = "0 0 * * * ?")
fun healthCheck() {
logger.debug("执行系统健康检查")
// 健康检查逻辑
}
}
// 5. 数据类
data class Order(
val id: String,
val userId: String,
val items: List<OrderItem>,
val totalAmount: BigDecimal,
val status: OrderStatus
)
data class OrderItem(
val productId: String,
val quantity: Int,
val price: BigDecimal
)
enum class OrderStatus {
PENDING, PROCESSING, COMPLETED, CANCELLED
}
data class SalesData(
val totalSales: BigDecimal = BigDecimal.ZERO,
val orderCount: Int = 0,
val topProducts: List<String> = emptyList()
)
data class Report(
val id: String = UUID.randomUUID().toString(),
val type: String = "DAILY_SALES",
val data: Map<String, Any> = emptyMap(),
val