Skip to content

Spring WebClient Context:响应式编程中的上下文传播机制 🚀

引言:为什么需要 Context?

在响应式编程的世界里,我们经常需要在复杂的异步操作链中传递一些重要信息。想象一下,你正在构建一个微服务应用,需要在多个嵌套的 HTTP 请求之间传递用户认证信息、请求追踪 ID 或其他关键数据。传统的线程本地变量(ThreadLocal)在响应式编程中失效了,因为操作可能在不同的线程中执行。

这就是 Reactor Context 发挥作用的地方!它为我们提供了一种在整个响应式链中传播数据的优雅方式。

IMPORTANT

Context 与 Attributes 的关键区别:Attributes 只影响当前请求,而 Context 能够传播到嵌套的请求中,比如通过 flatMapconcatMap 执行的后续操作。

核心概念理解

Context 的工作原理

Context 的设计哲学

NOTE

Reactor Context 遵循"不可变性"原则。每次修改 Context 都会创建一个新的实例,这确保了线程安全性和数据一致性。

实战应用场景

场景一:用户认证信息传播

让我们看一个实际的业务场景:在微服务架构中,我们需要在多个服务调用之间传递用户的认证令牌。

kotlin
// 传统方式:无法在嵌套请求中传播认证信息
class UserService {
    private val webClient = WebClient.create()
    
    fun getUserProfile(userId: String, token: String): Mono<UserProfile> {
        return webClient.get()
            .uri("https://api.example.com/users/{id}", userId)
            .header("Authorization", "Bearer $token") 
            .retrieve()
            .bodyToMono<UserProfile>()
            .flatMap { profile ->
                // 嵌套请求:获取用户权限
                webClient.get()
                    .uri("https://api.example.com/users/{id}/permissions", userId)
                    // 问题:这里无法自动获取到 token!
                    .retrieve()
                    .bodyToMono<List<Permission>>()
                    .map { permissions ->
                        profile.copy(permissions = permissions)
                    }
            }
    }
}
kotlin
// 使用 Context:认证信息自动传播
class UserService {
    private val webClient = WebClient.builder()
        .filter { request, next ->
            Mono.deferContextual { contextView ->
                val token = contextView.getOrEmpty<String>("auth-token")
                if (token.isPresent) {
                    val newRequest = ClientRequest.from(request)
                        .header("Authorization", "Bearer ${token.get()}") 
                        .build()
                    next.exchange(newRequest)
                } else {
                    next.exchange(request)
                }
            }
        }
        .build()
    
    fun getUserProfile(userId: String, token: String): Mono<UserProfile> {
        return webClient.get()
            .uri("https://api.example.com/users/{id}", userId)
            .retrieve()
            .bodyToMono<UserProfile>()
            .flatMap { profile ->
                // 嵌套请求:Context 中的 token 会自动传播
                webClient.get()
                    .uri("https://api.example.com/users/{id}/permissions", userId)
                    .retrieve()
                    .bodyToMono<List<Permission>>()
                    .map { permissions ->
                        profile.copy(permissions = permissions)
                    }
            }
            .contextWrite { context -> 
                context.put("auth-token", token) 
            }
    }
}

场景二:请求追踪与日志记录

在分布式系统中,追踪请求的完整链路是非常重要的:

kotlin
class OrderService {
    private val webClient = WebClient.builder()
        .filter { request, next ->
            Mono.deferContextual { contextView ->
                val traceId = contextView.getOrEmpty<String>("trace-id")
                val spanId = contextView.getOrEmpty<String>("span-id")
                
                val requestBuilder = ClientRequest.from(request)
                
                // 添加追踪头部
                if (traceId.isPresent) {
                    requestBuilder.header("X-Trace-ID", traceId.get()) 
                }
                if (spanId.isPresent) {
                    requestBuilder.header("X-Span-ID", spanId.get()) 
                }
                
                val newRequest = requestBuilder.build()
                
                // 记录请求日志
                logger.info("发送请求: ${request.method()} ${request.url()}, TraceID: ${traceId.orElse("N/A")}")
                
                next.exchange(newRequest)
            }
        }
        .build()
    
    fun processOrder(orderId: String): Mono<OrderResult> {
        val traceId = UUID.randomUUID().toString()
        
        return webClient.get()
            .uri("https://inventory-service/check/{orderId}", orderId)
            .retrieve()
            .bodyToMono<InventoryStatus>()
            .flatMap { inventory ->
                if (inventory.available) {
                    // 库存充足,调用支付服务
                    webClient.post()
                        .uri("https://payment-service/charge")
                        .bodyValue(PaymentRequest(orderId, inventory.price))
                        .retrieve()
                        .bodyToMono<PaymentResult>()
                } else {
                    Mono.error(RuntimeException("库存不足"))
                }
            }
            .flatMap { payment ->
                // 更新订单状态
                webClient.put()
                    .uri("https://order-service/orders/{orderId}/status", orderId)
                    .bodyValue(OrderStatusUpdate("PAID"))
                    .retrieve()
                    .bodyToMono<OrderResult>()
            }
            .contextWrite { context ->
                context.put("trace-id", traceId) 
                    .put("span-id", "order-processing-${System.currentTimeMillis()}")
            }
    }
    
    companion object {
        private val logger = LoggerFactory.getLogger(OrderService::class.java)
    }
}

Context 的高级用法

条件性上下文处理

kotlin
class ApiClient {
    private val webClient = WebClient.builder()
        .filter { request, next ->
            Mono.deferContextual { contextView ->
                // 从 Context 中获取多种可能的认证方式
                val apiKey = contextView.getOrEmpty<String>("api-key")
                val bearerToken = contextView.getOrEmpty<String>("bearer-token")
                val basicAuth = contextView.getOrEmpty<String>("basic-auth")
                
                val requestBuilder = ClientRequest.from(request)
                
                when {
                    bearerToken.isPresent -> {
                        requestBuilder.header("Authorization", "Bearer ${bearerToken.get()}")
                    }
                    apiKey.isPresent -> {
                        requestBuilder.header("X-API-Key", apiKey.get())
                    }
                    basicAuth.isPresent -> {
                        requestBuilder.header("Authorization", "Basic ${basicAuth.get()}")
                    }
                    else -> {
                        // 无认证信息,记录警告
                        logger.warn("请求 ${request.url()} 没有提供认证信息")
                    }
                }
                
                next.exchange(requestBuilder.build())
            }
        }
        .build()
    
    // 使用不同认证方式的便捷方法
    fun <T> withApiKey(apiKey: String, operation: () -> Mono<T>): Mono<T> {
        return operation()
            .contextWrite { context -> context.put("api-key", apiKey) }
    }
    
    fun <T> withBearerToken(token: String, operation: () -> Mono<T>): Mono<T> {
        return operation()
            .contextWrite { context -> context.put("bearer-token", token) }
    }
}

Context 数据的组合与传播

kotlin
class CompositeService {
    private val webClient = WebClient.create()
    
    fun complexOperation(userId: String, sessionId: String): Mono<ComplexResult> {
        return Mono.zip(
            getUserData(userId),
            getSessionData(sessionId),
            getSystemConfig()
        )
        .flatMap { tuple ->
            val (userData, sessionData, config) = tuple
            
            // 基于前面的结果执行更复杂的操作
            performBusinessLogic(userData, sessionData, config)
        }
        .contextWrite { context ->
            context.put("user-id", userId) 
                .put("session-id", sessionId) 
                .put("operation-start", System.currentTimeMillis()) 
        }
    }
    
    private fun getUserData(userId: String): Mono<UserData> {
        return webClient.get()
            .uri("https://user-service/users/{id}", userId)
            .retrieve()
            .bodyToMono<UserData>()
            .doOnNext { data ->
                // Context 在这里也是可用的
                logger.info("获取用户数据完成: $userId")
            }
    }
    
    private fun getSessionData(sessionId: String): Mono<SessionData> {
        return webClient.get()
            .uri("https://session-service/sessions/{id}", sessionId)
            .retrieve()
            .bodyToMono<SessionData>()
    }
    
    private fun getSystemConfig(): Mono<SystemConfig> {
        return webClient.get()
            .uri("https://config-service/config")
            .retrieve()
            .bodyToMono<SystemConfig>()
    }
    
    private fun performBusinessLogic(
        userData: UserData, 
        sessionData: SessionData, 
        config: SystemConfig
    ): Mono<ComplexResult> {
        return Mono.deferContextual { contextView ->
            val startTime = contextView.get<Long>("operation-start")
            val userId = contextView.get<String>("user-id")
            
            // 执行业务逻辑...
            webClient.post()
                .uri("https://business-service/process")
                .bodyValue(BusinessRequest(userData, sessionData, config))
                .retrieve()
                .bodyToMono<ComplexResult>()
                .doOnNext { result ->
                    val duration = System.currentTimeMillis() - startTime
                    logger.info("用户 $userId 的复杂操作完成,耗时: ${duration}ms")
                }
        }
    }
}

最佳实践与注意事项

✅ 推荐做法

最佳实践

  1. 在响应式链的末尾设置 Context:使用 contextWrite() 在链的最后设置上下文数据
  2. 使用有意义的键名:选择清晰、一致的键名,如 "user-id"、"trace-id" 等
  3. 避免存储大对象:Context 应该存储轻量级的数据,避免存储大型对象
  4. 合理使用 deferContextual:只在需要访问 Context 时使用,避免不必要的性能开销

⚠️ 常见陷阱

注意事项

  1. Context 的不可变性:记住每次修改 Context 都会创建新实例
  2. 键的类型安全:使用泛型确保类型安全,避免类型转换错误
  3. Context 的作用域:Context 只在当前响应式链中有效,不会跨越不同的 Mono/Flux 实例
kotlin
// ❌ 错误示例:Context 不会传播到新的 Mono 实例
fun incorrectContextUsage(): Mono<String> {
    return Mono.just("data")
        .contextWrite { context -> context.put("key", "value") }
        .then(
            Mono.deferContextual { contextView ->
                // 这里无法访问到上面设置的 Context!
                val value = contextView.getOrEmpty<String>("key")
                Mono.just(value.orElse("default"))
            }
        )
}

// ✅ 正确示例:在正确的位置设置 Context
fun correctContextUsage(): Mono<String> {
    return Mono.just("data")
        .flatMap { data ->
            Mono.deferContextual { contextView ->
                val value = contextView.getOrEmpty<String>("key")
                Mono.just("$data - ${value.orElse("default")}")
            }
        }
        .contextWrite { context -> context.put("key", "value") } 
}

总结

Spring WebClient 的 Context 机制为响应式编程提供了强大的数据传播能力。它解决了在复杂的异步操作链中传递关键信息的难题,使得我们能够构建更加健壮和可维护的响应式应用。

IMPORTANT

记住:Context 需要在响应式链的末尾设置(使用 contextWrite()),这样才能确保数据能够传播到链中的所有操作,包括嵌套的请求。

通过合理使用 Context,我们可以:

  • 🔐 在微服务调用链中传播认证信息
  • 📊 实现分布式追踪和日志记录
  • 🎯 传递业务上下文数据
  • 🔧 实现横切关注点的处理

掌握了 Context 的使用,你就拥有了构建复杂响应式应用的重要工具! 🎉