Skip to content

Spring MVC 异步请求处理 - 从阻塞到非阻塞的优雅转身 🚀

引言:为什么需要异步请求处理?

想象一下这样的场景:你的 Web 应用需要处理一个耗时的操作,比如调用第三方 API、执行复杂的数据库查询,或者处理大文件上传。在传统的同步处理模式下,每个请求都会占用一个线程,直到处理完成才释放。这就像一个餐厅只有几个服务员,每个服务员必须从接单到上菜全程陪伴一个客人,导致其他客人只能排队等待。

IMPORTANT

Spring MVC 的异步请求处理就是为了解决这个问题而生的!它让我们能够在等待耗时操作完成的同时,释放宝贵的线程资源去处理其他请求,从而大大提升应用的并发处理能力。

核心概念:异步处理的三大法宝

Spring MVC 提供了三种主要的异步处理方式,每种都有其独特的适用场景:

1. DeferredResult - 延迟结果的艺术 🎭

DeferredResult 是最灵活的异步处理方式,它允许你在任何线程中设置结果。

kotlin
@RestController
class AsyncController {
    
    // 模拟一个消息队列或缓存
    private val deferredResults = mutableListOf<DeferredResult<String>>()
    
    @GetMapping("/quotes")
    fun getQuotes(): DeferredResult<String> {
        val deferredResult = DeferredResult<String>(30000L) // 30秒超时
        
        // 设置超时回调
        deferredResult.onTimeout { 
            deferredResult.setErrorResult("请求超时,请稍后重试")
        }
        
        // 设置完成回调
        deferredResult.onCompletion { 
            println("请求处理完成")
        }
        
        // 将 DeferredResult 保存到某个地方,等待异步设置结果
        deferredResults.add(deferredResult) 
        
        return deferredResult
    }
    
    // 模拟从其他线程设置结果(比如消息队列监听器)
    @PostMapping("/publish-quote")
    fun publishQuote(@RequestBody quote: String): String {
        // 为所有等待的请求设置结果
        deferredResults.forEach { result ->
            if (!result.isSetOrExpired) {
                result.setResult("最新报价: $quote") 
            }
        }
        deferredResults.clear()
        return "报价已发布"
    }
}
kotlin
@RestController
class OrderController(
    private val orderService: OrderService,
    private val notificationService: NotificationService
) {
    
    @PostMapping("/orders")
    fun createOrder(@RequestBody orderRequest: OrderRequest): DeferredResult<OrderResponse> {
        val deferredResult = DeferredResult<OrderResponse>(60000L)
        
        // 异步处理订单创建
        CompletableFuture.supplyAsync {
            try {
                // 1. 创建订单(可能涉及库存检查、支付等耗时操作)
                val order = orderService.createOrder(orderRequest) 
                
                // 2. 发送通知(邮件、短信等)
                notificationService.sendOrderConfirmation(order) 
                
                OrderResponse(order.id, "订单创建成功", order.status)
            } catch (e: Exception) {
                throw OrderCreationException("订单创建失败: ${e.message}")
            }
        }.whenComplete { result, throwable ->
            if (throwable != null) {
                deferredResult.setErrorResult(throwable) 
            } else {
                deferredResult.setResult(result) 
            }
        }
        
        return deferredResult
    }
}

TIP

DeferredResult 的最佳使用场景

  • 需要等待外部事件(如消息队列消息)
  • 长轮询(Long Polling)实现
  • 需要在多个不同线程中设置结果的场景

2. Callable - 简单直接的异步执行 ⚡

Callable 是最简单的异步处理方式,Spring MVC 会自动在单独的线程中执行你的代码。

kotlin
@RestController
class FileController(private val fileService: FileService) {
    
    @PostMapping("/upload")
    fun uploadFile(@RequestParam("file") file: MultipartFile): Callable<UploadResponse> {
        return Callable { 
            try {
                // 这段代码会在单独的线程中执行
                val savedFile = fileService.saveFile(file) 
                
                // 模拟耗时的文件处理(如图片压缩、病毒扫描等)
                Thread.sleep(5000) 
                
                val processedFile = fileService.processFile(savedFile) 
                
                UploadResponse(
                    success = true,
                    fileId = processedFile.id,
                    message = "文件上传并处理成功"
                )
            } catch (e: Exception) {
                UploadResponse(
                    success = false,
                    message = "文件处理失败: ${e.message}"
                )
            }
        }
    }
    
    @GetMapping("/reports/{id}")
    fun generateReport(@PathVariable id: Long): Callable<ReportData> {
        return Callable { 
            // 生成复杂报表的耗时操作
            val data = fileService.generateComplexReport(id) 
            ReportData(id, data, "报表生成完成")
        }
    }
}

NOTE

Callable vs DeferredResult 的选择

  • 使用 Callable 当你的异步操作是一个独立的、可以在单独线程中完整执行的任务
  • 使用 DeferredResult 当你需要更细粒度的控制,或者结果需要从多个不同的地方设置

3. WebAsyncTask - 可定制的异步任务 🛠️

WebAsyncTask 提供了比 Callable 更多的定制选项,如自定义超时时间和执行器。

kotlin
@RestController
class CustomAsyncController {
    
    @GetMapping("/heavy-computation")
    fun performHeavyComputation(): WebAsyncTask<ComputationResult> {
        return WebAsyncTask(30000L) { // 30秒超时
            // 模拟复杂计算
            val startTime = System.currentTimeMillis()
            
            // 执行耗时计算
            Thread.sleep(10000) 
            val result = performComplexCalculation()
            
            val endTime = System.currentTimeMillis()
            
            ComputationResult(
                result = result,
                executionTime = endTime - startTime,
                message = "计算完成"
            )
        }.apply {
            // 设置超时回调
            onTimeout { 
                ComputationResult(
                    result = null,
                    executionTime = 30000,
                    message = "计算超时,请稍后重试"
                )
            }
            
            // 设置完成回调
            onCompletion { 
                println("异步任务执行完成")
            }
        }
    }
    
    private fun performComplexCalculation(): String {
        // 模拟复杂计算逻辑
        return "计算结果: ${(1..1000000).sum()}"
    }
}

HTTP 流式传输 - 实时数据的魅力 🌊

Server-Sent Events (SSE) - 服务器推送的利器

SSE 是实现服务器向客户端推送实时数据的绝佳方案:

kotlin
@RestController
class StreamingController {
    
    @GetMapping("/events", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun streamEvents(): SseEmitter {
        val emitter = SseEmitter(Long.MAX_VALUE) 
        
        // 在单独的线程中发送事件
        CompletableFuture.runAsync {
            try {
                for (i in 1..10) {
                    // 发送事件数据
                    emitter.send( 
                        SseEmitter.event()
                            .id(i.toString())
                            .name("progress")
                            .data("处理进度: ${i * 10}%")
                    )
                    Thread.sleep(1000) // 每秒发送一次
                }
                emitter.complete() 
            } catch (e: Exception) {
                emitter.completeWithError(e) 
            }
        }
        
        return emitter
    }
    
    @GetMapping("/stock-prices/{symbol}")
    fun streamStockPrices(@PathVariable symbol: String): SseEmitter {
        val emitter = SseEmitter()
        
        // 模拟股价推送
        val scheduler = Executors.newScheduledThreadPool(1)
        scheduler.scheduleAtFixedRate({
            try {
                val price = generateRandomPrice() 
                emitter.send(
                    SseEmitter.event()
                        .name("price-update")
                        .data(mapOf(
                            "symbol" to symbol,
                            "price" to price,
                            "timestamp" to System.currentTimeMillis()
                        ))
                )
            } catch (e: Exception) {
                emitter.completeWithError(e)
                scheduler.shutdown()
            }
        }, 0, 1, TimeUnit.SECONDS)
        
        // 客户端断开连接时清理资源
        emitter.onCompletion { scheduler.shutdown() } 
        emitter.onTimeout { scheduler.shutdown() }
        
        return emitter
    }
    
    private fun generateRandomPrice(): Double {
        return 100.0 + Random.nextDouble(-10.0, 10.0)
    }
}

响应式类型支持 - 拥抱反应式编程 🔄

Spring MVC 还支持返回响应式类型,如 Reactor 的 MonoFlux

kotlin
@RestController
class ReactiveController(private val webClient: WebClient) {
    
    @GetMapping("/user/{id}")
    fun getUser(@PathVariable id: Long): Mono<User> {
        return webClient.get() 
            .uri("/api/users/{id}", id)
            .retrieve()
            .bodyToMono(User::class.java)
            .timeout(Duration.ofSeconds(5))
            .onErrorReturn(User(id, "默认用户", "数据获取失败"))
    }
    
    @GetMapping("/users/stream", produces = [MediaType.APPLICATION_NDJSON_VALUE])
    fun streamUsers(): Flux<User> {
        return Flux.interval(Duration.ofSeconds(1)) 
            .take(10)
            .map { index ->
                User(
                    id = index,
                    name = "用户$index",
                    email = "user$index@example.com"
                )
            }
    }
}

异常处理与拦截 🛡️

异步请求的异常处理需要特别注意:

kotlin
@RestController
class AsyncExceptionController {
    
    @GetMapping("/risky-operation")
    fun riskyOperation(): DeferredResult<String> {
        val deferredResult = DeferredResult<String>()
        
        CompletableFuture.supplyAsync {
            // 模拟可能失败的操作
            if (Random.nextBoolean()) {
                throw RuntimeException("操作失败") 
            }
            "操作成功"
        }.whenComplete { result, throwable ->
            if (throwable != null) {
                deferredResult.setErrorResult(throwable) 
            } else {
                deferredResult.setResult(result)
            }
        }
        
        return deferredResult
    }
    
    @ExceptionHandler(RuntimeException::class)
    fun handleAsyncException(e: RuntimeException): ResponseEntity<ErrorResponse> {
        return ResponseEntity.badRequest()
            .body(ErrorResponse("异步操作失败", e.message ?: "未知错误"))
    }
}

data class ErrorResponse(val error: String, val message: String)

配置与优化 ⚙️

启用异步支持

kotlin
@Configuration
@EnableWebMvc
class WebConfig : WebMvcConfigurer {
    
    override fun configureAsyncSupport(configurer: AsyncSupportConfigurer) {
        configurer.setDefaultTimeout(30000) // 30秒超时
        configurer.setTaskExecutor(asyncTaskExecutor()) 
    }
    
    @Bean
    fun asyncTaskExecutor(): AsyncTaskExecutor {
        val executor = ThreadPoolTaskExecutor()
        executor.corePoolSize = 10
        executor.maxPoolSize = 50
        executor.queueCapacity = 100
        executor.threadNamePrefix = "async-"
        executor.initialize()
        return executor
    }
}
xml
<mvc:annotation-driven>
    <mvc:async-support 
        default-timeout="30000" 
        task-executor="asyncTaskExecutor"/>
</mvc:annotation-driven>

<bean id="asyncTaskExecutor" 
      class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="10"/>
    <property name="maxPoolSize" value="50"/>
    <property name="queueCapacity" value="100"/>
    <property name="threadNamePrefix" value="async-"/>
</bean>

WARNING

生产环境注意事项

  • 默认的 AsyncTaskExecutor 不适合生产环境高负载使用
  • 需要根据实际业务需求调整线程池参数
  • 设置合理的超时时间,避免资源泄露

Spring MVC 异步 vs WebFlux 🤔

特性Spring MVC 异步Spring WebFlux
基础架构基于 Servlet API基于 Reactive Streams
线程模型少量线程 + 异步处理事件循环 + 非阻塞 I/O
学习曲线相对平缓较陡峭
适用场景传统应用的性能优化高并发、低延迟应用
背压支持有限支持原生支持

TIP

选择建议

  • 如果你的应用已经基于 Spring MVC,异步处理是提升性能的好选择
  • 如果是新项目且对高并发有严格要求,考虑 WebFlux
  • 可以在同一个应用中混合使用两种方式

实战总结 ✅

Spring MVC 的异步请求处理为我们提供了在不改变现有架构的前提下显著提升应用性能的能力。通过合理使用 DeferredResultCallableWebAsyncTask,我们可以:

  1. 提升并发处理能力 - 释放线程资源处理更多请求
  2. 改善用户体验 - 避免长时间等待和超时
  3. 实现实时功能 - 通过 SSE 推送实时数据
  4. 优雅处理耗时操作 - 文件上传、报表生成等场景

注意事项

  • 异步处理增加了代码复杂度,需要仔细处理异常和超时
  • 需要合理配置线程池,避免资源耗尽
  • 注意客户端断开连接的处理,避免资源泄露

异步处理不是银弹,但在合适的场景下,它能让你的应用焕发新的活力!🎉