Appearance
Spring WebClient Exchange 方法详解 🚀
什么是 Exchange 方法?
在 Spring WebClient 中,exchangeToMono()
和 exchangeToFlux()
方法(Kotlin 中为 awaitExchange{}
和 exchangeToFlow{}
)是比 retrieve()
方法更高级的 HTTP 响应处理方式。它们提供了对 HTTP 响应的完全控制权,让开发者可以根据响应状态码、头部信息等进行自定义处理。
NOTE
Exchange 方法的核心价值在于提供了对 HTTP 响应的细粒度控制,特别适用于需要根据不同响应状态进行差异化处理的场景。
为什么需要 Exchange 方法?🤔
传统 retrieve() 方法的局限性
使用 retrieve()
方法时,我们只能获取成功响应的内容,对于错误状态码的处理相对有限:
kotlin
// 使用 retrieve() 的局限性
val person = webClient.get()
.uri("/persons/1")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono<Person>() // [!code warning] // 只能处理成功响应,错误处理有限
.awaitSingle()
kotlin
// 使用 exchange() 的灵活性
val person = webClient.get()
.uri("/persons/1")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange { response ->
when (response.statusCode()) {
HttpStatus.OK -> response.awaitBody<Person>()
HttpStatus.NOT_FOUND -> throw PersonNotFoundException("Person not found")
HttpStatus.UNAUTHORIZED -> throw UnauthorizedException("Access denied")
else -> throw response.createExceptionAndAwait()
}
}
Exchange 方法的核心优势 ✨
1. 精确的状态码控制
2. 响应头信息访问
kotlin
val result = webClient.get()
.uri("/api/data")
.awaitExchange { response ->
// 访问响应头信息
val contentType = response.headers().contentType()
val customHeader = response.headers().getFirst("X-Custom-Header")
when (response.statusCode()) {
HttpStatus.OK -> {
println("Content-Type: $contentType")
println("Custom Header: $customHeader")
response.awaitBody<String>()
}
else -> throw response.createExceptionAndAwait()
}
}
实际业务场景应用 💼
场景1:用户认证服务调用
kotlin
@Service
class UserAuthService(private val webClient: WebClient) {
suspend fun authenticateUser(token: String): AuthResult {
return webClient.post()
.uri("/auth/validate")
.header("Authorization", "Bearer $token")
.awaitExchange { response ->
when (response.statusCode()) {
HttpStatus.OK -> {
// 认证成功,解析用户信息
val userInfo = response.awaitBody<UserInfo>()
AuthResult.Success(userInfo)
}
HttpStatus.UNAUTHORIZED -> {
// Token 无效或过期
AuthResult.InvalidToken("Token is invalid or expired")
}
HttpStatus.FORBIDDEN -> {
// 权限不足
AuthResult.InsufficientPermissions("Access denied")
}
HttpStatus.TOO_MANY_REQUESTS -> {
// 请求过于频繁
val retryAfter = response.headers().getFirst("Retry-After")
AuthResult.RateLimited("Too many requests, retry after $retryAfter seconds")
}
else -> {
// 其他错误
AuthResult.Error("Authentication service error: ${response.statusCode()}")
}
}
}
}
}
sealed class AuthResult {
data class Success(val userInfo: UserInfo) : AuthResult()
data class InvalidToken(val message: String) : AuthResult()
data class InsufficientPermissions(val message: String) : AuthResult()
data class RateLimited(val message: String) : AuthResult()
data class Error(val message: String) : AuthResult()
}
场景2:文件下载服务
kotlin
@Service
class FileDownloadService(private val webClient: WebClient) {
suspend fun downloadFile(fileId: String): DownloadResult {
return webClient.get()
.uri("/files/{fileId}/download", fileId)
.awaitExchange { response ->
when (response.statusCode()) {
HttpStatus.OK -> {
// 获取文件信息
val fileName = response.headers().getFirst("Content-Disposition")
?.substringAfter("filename=")
?.trim('"')
val contentLength = response.headers().contentLength()
val contentType = response.headers().contentType()
val fileData = response.awaitBody<ByteArray>()
DownloadResult.Success(
fileName = fileName ?: "unknown",
contentType = contentType?.toString() ?: "application/octet-stream",
size = contentLength,
data = fileData
)
}
HttpStatus.NOT_FOUND -> {
DownloadResult.FileNotFound("File with ID $fileId not found")
}
HttpStatus.FORBIDDEN -> {
DownloadResult.AccessDenied("No permission to download this file")
}
HttpStatus.GONE -> {
DownloadResult.FileExpired("File has expired and is no longer available")
}
else -> {
DownloadResult.Error("Download failed: ${response.statusCode()}")
}
}
}
}
}
场景3:支付状态查询
kotlin
@Service
class PaymentService(private val webClient: WebClient) {
suspend fun checkPaymentStatus(paymentId: String): PaymentStatus {
return webClient.get()
.uri("/payments/{paymentId}/status", paymentId)
.awaitExchange { response ->
when (response.statusCode()) {
HttpStatus.OK -> {
// 支付信息存在,解析状态
response.awaitBody<PaymentStatus>()
}
HttpStatus.NOT_FOUND -> {
// 支付记录不存在
PaymentStatus.NotFound
}
HttpStatus.ACCEPTED -> {
// 支付处理中
val estimatedTime = response.headers().getFirst("X-Processing-Time")
PaymentStatus.Processing(estimatedTime?.toIntOrNull() ?: 0)
}
HttpStatus.CONFLICT -> {
// 支付状态冲突(可能重复查询)
PaymentStatus.Conflict("Payment status conflict")
}
else -> {
PaymentStatus.Error("Failed to check payment status: ${response.statusCode()}")
}
}
}
}
}
重要注意事项 ⚠️
WARNING
使用 Exchange 方法时,必须确保响应体被正确消费,否则会导致内存泄漏和连接泄漏。
IMPORTANT
一旦使用了 Exchange 方法处理响应,就不能在下游再次解码响应体。所有的响应处理逻辑都必须在 Exchange 回调函数中完成。
资源管理最佳实践
kotlin
// ✅ 正确的做法 - 确保响应被消费
val result = webClient.get()
.uri("/api/data")
.awaitExchange { response ->
when (response.statusCode()) {
HttpStatus.OK -> response.awaitBody<String>() // [!code highlight] // 响应被正确消费
else -> {
// 即使是错误情况,也要确保响应被处理
response.createExceptionAndAwait() // [!code highlight] // 创建异常并消费响应
}
}
}
// ❌ 错误的做法 - 可能导致资源泄漏
val result = webClient.get()
.uri("/api/data")
.awaitExchange { response ->
if (response.statusCode() == HttpStatus.OK) {
response.awaitBody<String>()
} else {
null // [!code error] // 响应未被消费,可能导致资源泄漏
}
}
Exchange vs Retrieve 对比 📊
特性 | retrieve() | exchangeToMono/Flux() |
---|---|---|
使用复杂度 | 简单 | 中等 |
状态码控制 | 有限 | 完全控制 |
响应头访问 | 不支持 | 完全支持 |
错误处理 | 基础 | 高度自定义 |
资源管理 | 自动 | 手动(需注意) |
适用场景 | 简单请求 | 复杂业务逻辑 |
总结 🎯
Exchange 方法是 Spring WebClient 提供的高级功能,它让我们能够:
- 精确控制响应处理:根据不同的 HTTP 状态码执行不同的业务逻辑
- 访问完整响应信息:包括状态码、响应头、响应体等所有信息
- 实现复杂业务场景:如认证、文件下载、支付状态查询等需要细粒度控制的场景
TIP
在选择使用 Exchange 方法时,请考虑业务复杂度。如果只是简单的成功响应处理,retrieve()
方法已经足够。只有在需要根据不同状态码进行差异化处理时,才考虑使用 Exchange 方法。
通过掌握 Exchange 方法,你将能够构建更加健壮和灵活的微服务间通信机制,为用户提供更好的服务体验! 🌟