Skip to content

WebClient Filters:让 HTTP 请求更智能的拦截器 🚀

什么是 WebClient Filters?

想象一下,你正在开发一个微服务应用,需要调用多个外部 API。每次调用都需要添加认证头、记录日志、处理错误重试等操作。如果在每个请求中都重复这些逻辑,代码会变得冗余且难以维护。

这时候,WebClient Filters 就像是一个智能的"中间人",它可以在请求发出前和响应返回后自动执行一些通用操作,让你的代码更加简洁和优雅。

NOTE

WebClient Filter 本质上是一个 ExchangeFilterFunction,它基于责任链模式,允许你在 HTTP 请求的生命周期中插入自定义逻辑。

核心概念与设计哲学 💡

为什么需要 Filters?

在没有 Filters 的世界里,我们可能会这样写代码:

kotlin
// 每个服务调用都需要重复添加认证和日志
class UserService {
    fun getUser(id: String): Mono<User> {
        return webClient.get()
            .uri("/users/{id}", id)
            .header("Authorization", "Bearer $token") 
            .header("X-Request-ID", UUID.randomUUID().toString()) 
            .retrieve()
            .doOnNext { logger.info("获取用户成功: $id") } 
            .bodyToMono<User>()
    }
}

class OrderService {
    fun getOrder(id: String): Mono<Order> {
        return webClient.get()
            .uri("/orders/{id}", id)
            .header("Authorization", "Bearer $token") 
            .header("X-Request-ID", UUID.randomUUID().toString()) 
            .retrieve()
            .doOnNext { logger.info("获取订单成功: $id") } 
            .bodyToMono<Order>()
    }
}
kotlin
// 配置一次,处处受益
val webClient = WebClient.builder()
    .filter { request, next ->
        val enhancedRequest = ClientRequest.from(request)
            .header("Authorization", "Bearer $token") 
            .header("X-Request-ID", UUID.randomUUID().toString()) 
            .build()
        
        next.exchange(enhancedRequest)
            .doOnNext { logger.info("请求成功: ${request.url()}") } 
    }
    .build()

// 业务代码变得简洁
class UserService {
    fun getUser(id: String): Mono<User> {
        return webClient.get()
            .uri("/users/{id}", id)
            .retrieve()
            .bodyToMono<User>()
    }
}

设计哲学:责任链模式

WebClient Filters 采用了经典的责任链模式,每个 Filter 都有机会处理请求,然后将控制权传递给链中的下一个 Filter。

基础用法:从简单开始 🌱

1. 添加请求头

最常见的使用场景是为所有请求添加通用的请求头:

kotlin
val client = WebClient.builder()
    .filter { request, next ->
        // 创建增强的请求,添加自定义头部
        val filtered = ClientRequest.from(request)
            .header("User-Agent", "MyApp/1.0") 
            .header("X-API-Version", "v1") 
            .build()

        // 继续执行请求链
        next.exchange(filtered)
    }
    .build()

TIP

ClientRequest.from(request) 创建了原请求的副本,这样可以安全地修改请求而不影响原始对象。

2. 使用内置的认证 Filter

Spring 提供了一些开箱即用的 Filter,比如基础认证:

kotlin
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication

val client = WebClient.builder()
    .filter(basicAuthentication("username", "password")) 
    .build()

高级应用场景 🚀

1. 智能 Token 刷新 Filter

在微服务架构中,Token 过期是常见问题。让我们创建一个智能的 Token 刷新 Filter:

kotlin
@Component
class TokenRefreshFilter(
    private val tokenService: TokenService
) : ExchangeFilterFunction {

    override fun filter(request: ClientRequest, next: ExchangeFunction): Mono<ClientResponse> {
        return next.exchange(request)
            .flatMap { response ->
                when {
                    // 检测到 401 未授权错误
                    response.statusCode() == HttpStatus.UNAUTHORIZED -> {
                        // 释放响应体资源,避免内存泄漏
                        response.releaseBody() 
                            .then(refreshTokenAndRetry(request, next))
                    }
                    else -> Mono.just(response)
                }
            }
    }

    private fun refreshTokenAndRetry(
        originalRequest: ClientRequest, 
        next: ExchangeFunction
    ): Mono<ClientResponse> {
        return tokenService.refreshToken()
            .flatMap { newToken ->
                // 使用新 Token 重新构建请求
                val newRequest = ClientRequest.from(originalRequest)
                    .header("Authorization", "Bearer $newToken") 
                    .build()
                
                next.exchange(newRequest)
            }
    }
}

IMPORTANT

当 Filter 处理响应时,必须确保响应内容被正确消费或传播,否则可能导致内存泄漏。使用 response.releaseBody() 来释放未使用的响应体。

2. 请求日志与监控 Filter

创建一个综合的日志和监控 Filter:

kotlin
@Component
class RequestLoggingFilter : ExchangeFilterFunction {
    
    private val logger = LoggerFactory.getLogger(RequestLoggingFilter::class.java)
    private val meterRegistry = Metrics.globalRegistry

    override fun filter(request: ClientRequest, next: ExchangeFunction): Mono<ClientResponse> {
        val startTime = System.currentTimeMillis()
        val requestId = UUID.randomUUID().toString()
        
        // 记录请求开始
        logger.info("🚀 [{}] {} {}", requestId, request.method(), request.url()) 
        
        return next.exchange(request)
            .doOnNext { response ->
                val duration = System.currentTimeMillis() - startTime
                
                // 记录响应信息
                logger.info(
                    "✅ [{}] {} {} - {} ({}ms)", 
                    requestId, 
                    request.method(), 
                    request.url(),
                    response.statusCode(),
                    duration
                )
                
                // 记录监控指标
                Timer.Sample.start(meterRegistry)
                    .stop(Timer.builder("http.client.requests")
                        .tag("method", request.method().name())
                        .tag("status", response.statusCode().value().toString())
                        .register(meterRegistry))
            }
            .doOnError { error ->
                val duration = System.currentTimeMillis() - startTime
                logger.error("❌ [{}] {} {} - Error ({}ms): {}", 
                    requestId, request.method(), request.url(), duration, error.message)
            }
    }
}

3. 动态 Filter 管理

有时我们需要根据不同的场景动态添加或移除 Filter:

kotlin
@Service
class DynamicWebClientService {
    
    private val baseClient = WebClient.builder()
        .baseUrl("https://api.example.com")
        .build()
    
    // 为特定用户添加认证 Filter
    fun createAuthenticatedClient(token: String): WebClient {
        return baseClient.mutate()
            .filters { filterList ->
                // 在过滤器链的开头添加认证过滤器
                filterList.add(0, createAuthFilter(token)) 
            }
            .build()
    }
    
    // 为调试环境添加详细日志 Filter
    fun createDebugClient(): WebClient {
        return baseClient.mutate()
            .filters { filterList ->
                filterList.add(createDebugLoggingFilter()) 
            }
            .build()
    }
    
    private fun createAuthFilter(token: String): ExchangeFilterFunction {
        return ExchangeFilterFunction { request, next ->
            val authenticatedRequest = ClientRequest.from(request)
                .header("Authorization", "Bearer $token")
                .build()
            next.exchange(authenticatedRequest)
        }
    }
}

实战案例:文件上传优化 Filter 📁

让我们看一个复杂的实际案例 - 为 multipart/form-data 请求自动计算 Content-Length

完整的文件上传优化 Filter 实现
kotlin
/**
 * 多部分表单数据交换过滤器
 * 
 * 解决问题:
 * - 某些服务器要求 multipart 请求必须包含 Content-Length 头
 * - WebClient 默认使用 chunked 传输,不会自动计算 Content-Length
 * - 通过缓冲数据来计算准确的内容长度
 */
class MultipartExchangeFilterFunction : ExchangeFilterFunction {

    override fun filter(request: ClientRequest, next: ExchangeFunction): Mono<ClientResponse> {
        // 检查是否为需要处理的请求类型
        val contentType = request.headers().contentType
        val method = request.method()
        
        return if (MediaType.MULTIPART_FORM_DATA.includes(contentType) && 
                   (method == HttpMethod.PUT || method == HttpMethod.POST)) {
            
            // 创建带缓冲装饰器的新请求
            val bufferedRequest = ClientRequest.from(request)
                .body { outputMessage, context ->
                    request.body().insert(BufferingDecorator(outputMessage), context)
                }
                .build()
            
            next.exchange(bufferedRequest)
        } else {
            // 不需要处理的请求直接传递
            next.exchange(request)
        }
    }

    /**
     * 缓冲装饰器 - 核心逻辑
     * 将请求体数据缓冲到内存中,计算总长度后设置 Content-Length 头
     */
    private class BufferingDecorator(
        delegate: ClientHttpRequest
    ) : ClientHttpRequestDecorator(delegate) {

        override fun writeWith(body: Publisher<out DataBuffer>): Mono<Void> {
            // 将所有数据块合并为单个缓冲区
            return DataBufferUtils.join(body) 
                .flatMap { buffer ->
                    // 设置准确的内容长度
                    headers.contentLength = buffer.readableByteCount().toLong() 
                    
                    // 写入缓冲的数据
                    super.writeWith(Mono.just(buffer))
                }
        }
    }
}

// 使用示例
@Configuration
class WebClientConfig {
    
    @Bean
    fun fileUploadWebClient(): WebClient {
        return WebClient.builder()
            .filter(MultipartExchangeFilterFunction()) 
            .build()
    }
}

// 业务服务中的使用
@Service
class FileUploadService(
    private val fileUploadWebClient: WebClient
) {
    
    fun uploadFile(file: MultipartFile): Mono<String> {
        val multipartData = LinkedMultiValueMap<String, Any>().apply {
            add("file", file.resource)
            add("description", "用户上传的文件")
        }
        
        return fileUploadWebClient.post()
            .uri("/upload")
            .contentType(MediaType.MULTIPART_FORM_DATA)
            .body(BodyInserters.fromMultipartData(multipartData))
            .retrieve()
            .bodyToMono<String>()
    }
}

WARNING

缓冲整个请求体会增加内存使用量。对于大文件上传,请考虑使用流式处理或分块上传的替代方案。

最佳实践与注意事项 ⚡

1. Filter 执行顺序

Filter 的执行顺序很重要,通常遵循以下原则:

kotlin
val client = WebClient.builder()
    .filter(authenticationFilter)    // 1. 认证(最先执行)
    .filter(loggingFilter)          // 2. 日志记录
    .filter(retryFilter)            // 3. 重试逻辑
    .filter(metricsFilter)          // 4. 监控指标(最后执行)
    .build()

2. 资源管理

内存泄漏风险

当 Filter 需要处理响应体时,务必确保资源被正确释放:

kotlin
// ❌ 错误做法 - 可能导致内存泄漏
response.bodyToMono<String>() // 没有消费响应体

// ✅ 正确做法 - 明确释放资源
response.releaseBody() // 释放未使用的响应体

3. 异常处理

kotlin
fun createRobustFilter(): ExchangeFilterFunction {
    return ExchangeFilterFunction { request, next ->
        next.exchange(request)
            .onErrorResume { error ->
                when (error) {
                    is WebClientResponseException -> {
                        logger.error("HTTP错误: ${error.statusCode} - ${error.responseBodyAsString}")
                        Mono.error(BusinessException("服务调用失败", error))
                    }
                    is ConnectTimeoutException -> {
                        logger.error("连接超时: ${request.url()}")
                        Mono.error(BusinessException("服务连接超时", error))
                    }
                    else -> {
                        logger.error("未知错误: ${error.message}")
                        Mono.error(error)
                    }
                }
            }
    }
}

总结 🎯

WebClient Filters 是构建健壮微服务应用的重要工具:

  • 🎯 关注点分离:将横切关注点(认证、日志、监控)从业务逻辑中分离
  • 🔄 可重用性:一次编写,处处使用,避免代码重复
  • 🛡️ 增强功能:提供统一的错误处理、重试、监控等能力
  • 🎛️ 灵活配置:支持动态添加、移除和排序 Filter

通过合理使用 WebClient Filters,你可以构建出更加优雅、可维护和健壮的 HTTP 客户端代码。记住,好的架构不是一蹴而就的,而是在实践中不断演进和优化的结果! ✨