Appearance
Spring WebClient Context:响应式编程中的上下文传播机制 🚀
引言:为什么需要 Context?
在响应式编程的世界里,我们经常需要在复杂的异步操作链中传递一些重要信息。想象一下,你正在构建一个微服务应用,需要在多个嵌套的 HTTP 请求之间传递用户认证信息、请求追踪 ID 或其他关键数据。传统的线程本地变量(ThreadLocal)在响应式编程中失效了,因为操作可能在不同的线程中执行。
这就是 Reactor Context 发挥作用的地方!它为我们提供了一种在整个响应式链中传播数据的优雅方式。
IMPORTANT
Context 与 Attributes 的关键区别:Attributes 只影响当前请求,而 Context 能够传播到嵌套的请求中,比如通过 flatMap
或 concatMap
执行的后续操作。
核心概念理解
Context 的工作原理
Context 的设计哲学
NOTE
Reactor Context 遵循"不可变性"原则。每次修改 Context 都会创建一个新的实例,这确保了线程安全性和数据一致性。
实战应用场景
场景一:用户认证信息传播
让我们看一个实际的业务场景:在微服务架构中,我们需要在多个服务调用之间传递用户的认证令牌。
kotlin
// 传统方式:无法在嵌套请求中传播认证信息
class UserService {
private val webClient = WebClient.create()
fun getUserProfile(userId: String, token: String): Mono<UserProfile> {
return webClient.get()
.uri("https://api.example.com/users/{id}", userId)
.header("Authorization", "Bearer $token")
.retrieve()
.bodyToMono<UserProfile>()
.flatMap { profile ->
// 嵌套请求:获取用户权限
webClient.get()
.uri("https://api.example.com/users/{id}/permissions", userId)
// 问题:这里无法自动获取到 token!
.retrieve()
.bodyToMono<List<Permission>>()
.map { permissions ->
profile.copy(permissions = permissions)
}
}
}
}
kotlin
// 使用 Context:认证信息自动传播
class UserService {
private val webClient = WebClient.builder()
.filter { request, next ->
Mono.deferContextual { contextView ->
val token = contextView.getOrEmpty<String>("auth-token")
if (token.isPresent) {
val newRequest = ClientRequest.from(request)
.header("Authorization", "Bearer ${token.get()}")
.build()
next.exchange(newRequest)
} else {
next.exchange(request)
}
}
}
.build()
fun getUserProfile(userId: String, token: String): Mono<UserProfile> {
return webClient.get()
.uri("https://api.example.com/users/{id}", userId)
.retrieve()
.bodyToMono<UserProfile>()
.flatMap { profile ->
// 嵌套请求:Context 中的 token 会自动传播
webClient.get()
.uri("https://api.example.com/users/{id}/permissions", userId)
.retrieve()
.bodyToMono<List<Permission>>()
.map { permissions ->
profile.copy(permissions = permissions)
}
}
.contextWrite { context ->
context.put("auth-token", token)
}
}
}
场景二:请求追踪与日志记录
在分布式系统中,追踪请求的完整链路是非常重要的:
kotlin
class OrderService {
private val webClient = WebClient.builder()
.filter { request, next ->
Mono.deferContextual { contextView ->
val traceId = contextView.getOrEmpty<String>("trace-id")
val spanId = contextView.getOrEmpty<String>("span-id")
val requestBuilder = ClientRequest.from(request)
// 添加追踪头部
if (traceId.isPresent) {
requestBuilder.header("X-Trace-ID", traceId.get())
}
if (spanId.isPresent) {
requestBuilder.header("X-Span-ID", spanId.get())
}
val newRequest = requestBuilder.build()
// 记录请求日志
logger.info("发送请求: ${request.method()} ${request.url()}, TraceID: ${traceId.orElse("N/A")}")
next.exchange(newRequest)
}
}
.build()
fun processOrder(orderId: String): Mono<OrderResult> {
val traceId = UUID.randomUUID().toString()
return webClient.get()
.uri("https://inventory-service/check/{orderId}", orderId)
.retrieve()
.bodyToMono<InventoryStatus>()
.flatMap { inventory ->
if (inventory.available) {
// 库存充足,调用支付服务
webClient.post()
.uri("https://payment-service/charge")
.bodyValue(PaymentRequest(orderId, inventory.price))
.retrieve()
.bodyToMono<PaymentResult>()
} else {
Mono.error(RuntimeException("库存不足"))
}
}
.flatMap { payment ->
// 更新订单状态
webClient.put()
.uri("https://order-service/orders/{orderId}/status", orderId)
.bodyValue(OrderStatusUpdate("PAID"))
.retrieve()
.bodyToMono<OrderResult>()
}
.contextWrite { context ->
context.put("trace-id", traceId)
.put("span-id", "order-processing-${System.currentTimeMillis()}")
}
}
companion object {
private val logger = LoggerFactory.getLogger(OrderService::class.java)
}
}
Context 的高级用法
条件性上下文处理
kotlin
class ApiClient {
private val webClient = WebClient.builder()
.filter { request, next ->
Mono.deferContextual { contextView ->
// 从 Context 中获取多种可能的认证方式
val apiKey = contextView.getOrEmpty<String>("api-key")
val bearerToken = contextView.getOrEmpty<String>("bearer-token")
val basicAuth = contextView.getOrEmpty<String>("basic-auth")
val requestBuilder = ClientRequest.from(request)
when {
bearerToken.isPresent -> {
requestBuilder.header("Authorization", "Bearer ${bearerToken.get()}")
}
apiKey.isPresent -> {
requestBuilder.header("X-API-Key", apiKey.get())
}
basicAuth.isPresent -> {
requestBuilder.header("Authorization", "Basic ${basicAuth.get()}")
}
else -> {
// 无认证信息,记录警告
logger.warn("请求 ${request.url()} 没有提供认证信息")
}
}
next.exchange(requestBuilder.build())
}
}
.build()
// 使用不同认证方式的便捷方法
fun <T> withApiKey(apiKey: String, operation: () -> Mono<T>): Mono<T> {
return operation()
.contextWrite { context -> context.put("api-key", apiKey) }
}
fun <T> withBearerToken(token: String, operation: () -> Mono<T>): Mono<T> {
return operation()
.contextWrite { context -> context.put("bearer-token", token) }
}
}
Context 数据的组合与传播
kotlin
class CompositeService {
private val webClient = WebClient.create()
fun complexOperation(userId: String, sessionId: String): Mono<ComplexResult> {
return Mono.zip(
getUserData(userId),
getSessionData(sessionId),
getSystemConfig()
)
.flatMap { tuple ->
val (userData, sessionData, config) = tuple
// 基于前面的结果执行更复杂的操作
performBusinessLogic(userData, sessionData, config)
}
.contextWrite { context ->
context.put("user-id", userId)
.put("session-id", sessionId)
.put("operation-start", System.currentTimeMillis())
}
}
private fun getUserData(userId: String): Mono<UserData> {
return webClient.get()
.uri("https://user-service/users/{id}", userId)
.retrieve()
.bodyToMono<UserData>()
.doOnNext { data ->
// Context 在这里也是可用的
logger.info("获取用户数据完成: $userId")
}
}
private fun getSessionData(sessionId: String): Mono<SessionData> {
return webClient.get()
.uri("https://session-service/sessions/{id}", sessionId)
.retrieve()
.bodyToMono<SessionData>()
}
private fun getSystemConfig(): Mono<SystemConfig> {
return webClient.get()
.uri("https://config-service/config")
.retrieve()
.bodyToMono<SystemConfig>()
}
private fun performBusinessLogic(
userData: UserData,
sessionData: SessionData,
config: SystemConfig
): Mono<ComplexResult> {
return Mono.deferContextual { contextView ->
val startTime = contextView.get<Long>("operation-start")
val userId = contextView.get<String>("user-id")
// 执行业务逻辑...
webClient.post()
.uri("https://business-service/process")
.bodyValue(BusinessRequest(userData, sessionData, config))
.retrieve()
.bodyToMono<ComplexResult>()
.doOnNext { result ->
val duration = System.currentTimeMillis() - startTime
logger.info("用户 $userId 的复杂操作完成,耗时: ${duration}ms")
}
}
}
}
最佳实践与注意事项
✅ 推荐做法
最佳实践
- 在响应式链的末尾设置 Context:使用
contextWrite()
在链的最后设置上下文数据 - 使用有意义的键名:选择清晰、一致的键名,如 "user-id"、"trace-id" 等
- 避免存储大对象:Context 应该存储轻量级的数据,避免存储大型对象
- 合理使用 deferContextual:只在需要访问 Context 时使用,避免不必要的性能开销
⚠️ 常见陷阱
注意事项
- Context 的不可变性:记住每次修改 Context 都会创建新实例
- 键的类型安全:使用泛型确保类型安全,避免类型转换错误
- Context 的作用域:Context 只在当前响应式链中有效,不会跨越不同的 Mono/Flux 实例
kotlin
// ❌ 错误示例:Context 不会传播到新的 Mono 实例
fun incorrectContextUsage(): Mono<String> {
return Mono.just("data")
.contextWrite { context -> context.put("key", "value") }
.then(
Mono.deferContextual { contextView ->
// 这里无法访问到上面设置的 Context!
val value = contextView.getOrEmpty<String>("key")
Mono.just(value.orElse("default"))
}
)
}
// ✅ 正确示例:在正确的位置设置 Context
fun correctContextUsage(): Mono<String> {
return Mono.just("data")
.flatMap { data ->
Mono.deferContextual { contextView ->
val value = contextView.getOrEmpty<String>("key")
Mono.just("$data - ${value.orElse("default")}")
}
}
.contextWrite { context -> context.put("key", "value") }
}
总结
Spring WebClient 的 Context 机制为响应式编程提供了强大的数据传播能力。它解决了在复杂的异步操作链中传递关键信息的难题,使得我们能够构建更加健壮和可维护的响应式应用。
IMPORTANT
记住:Context 需要在响应式链的末尾设置(使用 contextWrite()
),这样才能确保数据能够传播到链中的所有操作,包括嵌套的请求。
通过合理使用 Context,我们可以:
- 🔐 在微服务调用链中传播认证信息
- 📊 实现分布式追踪和日志记录
- 🎯 传递业务上下文数据
- 🔧 实现横切关注点的处理
掌握了 Context 的使用,你就拥有了构建复杂响应式应用的重要工具! 🎉