Skip to content

Spring WebClient Exchange 方法详解 🚀

什么是 Exchange 方法?

在 Spring WebClient 中,exchangeToMono()exchangeToFlux() 方法(Kotlin 中为 awaitExchange{}exchangeToFlow{})是比 retrieve() 方法更高级的 HTTP 响应处理方式。它们提供了对 HTTP 响应的完全控制权,让开发者可以根据响应状态码、头部信息等进行自定义处理。

NOTE

Exchange 方法的核心价值在于提供了对 HTTP 响应的细粒度控制,特别适用于需要根据不同响应状态进行差异化处理的场景。

为什么需要 Exchange 方法?🤔

传统 retrieve() 方法的局限性

使用 retrieve() 方法时,我们只能获取成功响应的内容,对于错误状态码的处理相对有限:

kotlin
// 使用 retrieve() 的局限性
val person = webClient.get()
    .uri("/persons/1")
    .accept(MediaType.APPLICATION_JSON)
    .retrieve()
    .bodyToMono<Person>() // [!code warning] // 只能处理成功响应,错误处理有限
    .awaitSingle()
kotlin
// 使用 exchange() 的灵活性
val person = webClient.get()
    .uri("/persons/1")
    .accept(MediaType.APPLICATION_JSON)
    .awaitExchange { response ->
        when (response.statusCode()) { 
            HttpStatus.OK -> response.awaitBody<Person>() 
            HttpStatus.NOT_FOUND -> throw PersonNotFoundException("Person not found") 
            HttpStatus.UNAUTHORIZED -> throw UnauthorizedException("Access denied") 
            else -> throw response.createExceptionAndAwait() 
        } 
    }

Exchange 方法的核心优势 ✨

1. 精确的状态码控制

2. 响应头信息访问

kotlin
val result = webClient.get()
    .uri("/api/data")
    .awaitExchange { response ->
        // 访问响应头信息
        val contentType = response.headers().contentType() 
        val customHeader = response.headers().getFirst("X-Custom-Header") 
        
        when (response.statusCode()) {
            HttpStatus.OK -> {
                println("Content-Type: $contentType")
                println("Custom Header: $customHeader")
                response.awaitBody<String>()
            }
            else -> throw response.createExceptionAndAwait()
        }
    }

实际业务场景应用 💼

场景1:用户认证服务调用

kotlin
@Service
class UserAuthService(private val webClient: WebClient) {
    
    suspend fun authenticateUser(token: String): AuthResult {
        return webClient.post()
            .uri("/auth/validate")
            .header("Authorization", "Bearer $token")
            .awaitExchange { response ->
                when (response.statusCode()) {
                    HttpStatus.OK -> {
                        // 认证成功,解析用户信息
                        val userInfo = response.awaitBody<UserInfo>() 
                        AuthResult.Success(userInfo)
                    }
                    HttpStatus.UNAUTHORIZED -> {
                        // Token 无效或过期
                        AuthResult.InvalidToken("Token is invalid or expired") 
                    }
                    HttpStatus.FORBIDDEN -> {
                        // 权限不足
                        AuthResult.InsufficientPermissions("Access denied") 
                    }
                    HttpStatus.TOO_MANY_REQUESTS -> {
                        // 请求过于频繁
                        val retryAfter = response.headers().getFirst("Retry-After") 
                        AuthResult.RateLimited("Too many requests, retry after $retryAfter seconds")
                    }
                    else -> {
                        // 其他错误
                        AuthResult.Error("Authentication service error: ${response.statusCode()}")
                    }
                }
            }
    }
}

sealed class AuthResult {
    data class Success(val userInfo: UserInfo) : AuthResult()
    data class InvalidToken(val message: String) : AuthResult()
    data class InsufficientPermissions(val message: String) : AuthResult()
    data class RateLimited(val message: String) : AuthResult()
    data class Error(val message: String) : AuthResult()
}

场景2:文件下载服务

kotlin
@Service
class FileDownloadService(private val webClient: WebClient) {
    
    suspend fun downloadFile(fileId: String): DownloadResult {
        return webClient.get()
            .uri("/files/{fileId}/download", fileId)
            .awaitExchange { response ->
                when (response.statusCode()) {
                    HttpStatus.OK -> {
                        // 获取文件信息
                        val fileName = response.headers().getFirst("Content-Disposition") 
                            ?.substringAfter("filename=")
                            ?.trim('"')
                        val contentLength = response.headers().contentLength() 
                        val contentType = response.headers().contentType() 
                        
                        val fileData = response.awaitBody<ByteArray>()
                        
                        DownloadResult.Success(
                            fileName = fileName ?: "unknown",
                            contentType = contentType?.toString() ?: "application/octet-stream",
                            size = contentLength,
                            data = fileData
                        )
                    }
                    HttpStatus.NOT_FOUND -> {
                        DownloadResult.FileNotFound("File with ID $fileId not found") 
                    }
                    HttpStatus.FORBIDDEN -> {
                        DownloadResult.AccessDenied("No permission to download this file") 
                    }
                    HttpStatus.GONE -> {
                        DownloadResult.FileExpired("File has expired and is no longer available") 
                    }
                    else -> {
                        DownloadResult.Error("Download failed: ${response.statusCode()}")
                    }
                }
            }
    }
}

场景3:支付状态查询

kotlin
@Service
class PaymentService(private val webClient: WebClient) {
    
    suspend fun checkPaymentStatus(paymentId: String): PaymentStatus {
        return webClient.get()
            .uri("/payments/{paymentId}/status", paymentId)
            .awaitExchange { response ->
                when (response.statusCode()) {
                    HttpStatus.OK -> {
                        // 支付信息存在,解析状态
                        response.awaitBody<PaymentStatus>() 
                    }
                    HttpStatus.NOT_FOUND -> {
                        // 支付记录不存在
                        PaymentStatus.NotFound 
                    }
                    HttpStatus.ACCEPTED -> {
                        // 支付处理中
                        val estimatedTime = response.headers().getFirst("X-Processing-Time") 
                        PaymentStatus.Processing(estimatedTime?.toIntOrNull() ?: 0)
                    }
                    HttpStatus.CONFLICT -> {
                        // 支付状态冲突(可能重复查询)
                        PaymentStatus.Conflict("Payment status conflict") 
                    }
                    else -> {
                        PaymentStatus.Error("Failed to check payment status: ${response.statusCode()}")
                    }
                }
            }
    }
}

重要注意事项 ⚠️

WARNING

使用 Exchange 方法时,必须确保响应体被正确消费,否则会导致内存泄漏和连接泄漏。

IMPORTANT

一旦使用了 Exchange 方法处理响应,就不能在下游再次解码响应体。所有的响应处理逻辑都必须在 Exchange 回调函数中完成。

资源管理最佳实践

kotlin
// ✅ 正确的做法 - 确保响应被消费
val result = webClient.get()
    .uri("/api/data")
    .awaitExchange { response ->
        when (response.statusCode()) {
            HttpStatus.OK -> response.awaitBody<String>() // [!code highlight] // 响应被正确消费
            else -> {
                // 即使是错误情况,也要确保响应被处理
                response.createExceptionAndAwait() // [!code highlight] // 创建异常并消费响应
            }
        }
    }

// ❌ 错误的做法 - 可能导致资源泄漏
val result = webClient.get()
    .uri("/api/data")
    .awaitExchange { response ->
        if (response.statusCode() == HttpStatus.OK) {
            response.awaitBody<String>()
        } else {
            null // [!code error] // 响应未被消费,可能导致资源泄漏
        }
    }

Exchange vs Retrieve 对比 📊

特性retrieve()exchangeToMono/Flux()
使用复杂度简单中等
状态码控制有限完全控制
响应头访问不支持完全支持
错误处理基础高度自定义
资源管理自动手动(需注意)
适用场景简单请求复杂业务逻辑

总结 🎯

Exchange 方法是 Spring WebClient 提供的高级功能,它让我们能够:

  1. 精确控制响应处理:根据不同的 HTTP 状态码执行不同的业务逻辑
  2. 访问完整响应信息:包括状态码、响应头、响应体等所有信息
  3. 实现复杂业务场景:如认证、文件下载、支付状态查询等需要细粒度控制的场景

TIP

在选择使用 Exchange 方法时,请考虑业务复杂度。如果只是简单的成功响应处理,retrieve() 方法已经足够。只有在需要根据不同状态码进行差异化处理时,才考虑使用 Exchange 方法。

通过掌握 Exchange 方法,你将能够构建更加健壮和灵活的微服务间通信机制,为用户提供更好的服务体验! 🌟