Skip to content

Spring WebFlux 响应式 Web 应用开发指南 🚀

什么是 Spring WebFlux?

Spring WebFlux 是 Spring Framework 5.0 引入的响应式 Web 框架,它代表了 Web 开发的一次重大革新。与传统的 Spring MVC 不同,WebFlux 完全基于响应式编程模型,提供了非阻塞、异步的 Web 应用开发体验。

NOTE

WebFlux 不依赖 Servlet API,完全异步和非阻塞,通过 Reactor 项目实现了 Reactive Streams 规范。

为什么需要响应式编程?

想象一下传统的同步 Web 应用:

而响应式应用的处理方式:

IMPORTANT

响应式编程的核心优势:用更少的线程处理更多的并发请求,显著提升系统的吞吐量和资源利用率。

WebFlux 的两种编程模型

Spring WebFlux 提供了两种截然不同的编程风格,让开发者可以根据项目需求和团队偏好选择最适合的方式。

1. 注解式编程模型(类似 Spring MVC)

如果你熟悉 Spring MVC,这种方式会让你感到亲切:

kotlin
@RestController
@RequestMapping("/users")
class UserController(
    private val userRepository: UserRepository,
    private val customerRepository: CustomerRepository
) {
    
    // 返回单个用户 - 使用 Mono
    @GetMapping("/{userId}")
    fun getUser(@PathVariable userId: Long): Mono<User?> {
        return userRepository.findById(userId) 
    }
    
    // 返回用户的所有客户 - 使用 Flux
    @GetMapping("/{userId}/customers")
    fun getUserCustomers(@PathVariable userId: Long): Flux<Customer> {
        return userRepository.findById(userId)
            .flatMapMany { user ->
                customerRepository.findByUser(user)
            }
    }
    
    // 删除用户 - 返回空结果
    @DeleteMapping("/{userId}")
    fun deleteUser(@PathVariable userId: Long): Mono<Void> {
        return userRepository.deleteById(userId) 
    }
}

TIP

Mono vs Flux 的选择原则

  • Mono<T>:表示 0 或 1 个元素的异步序列
  • Flux<T>:表示 0 到 N 个元素的异步序列

2. 函数式编程模型(WebFlux.fn)

函数式模型将路由配置与请求处理逻辑分离,提供了更灵活的组织方式:

kotlin
@Configuration(proxyBeanMethods = false)
class UserRoutingConfiguration {
    
    @Bean
    fun userRoutes(userHandler: UserHandler): RouterFunction<ServerResponse> {
        return RouterFunctions.route()
            .GET("/users/{id}", ACCEPT_JSON, userHandler::getUser) 
            .GET("/users/{id}/customers", ACCEPT_JSON, userHandler::getUserCustomers) 
            .DELETE("/users/{id}", ACCEPT_JSON, userHandler::deleteUser) 
            .build()
    }
    
    companion object {
        private val ACCEPT_JSON = accept(MediaType.APPLICATION_JSON)
    }
}
kotlin
@Component
class UserHandler(
    private val userRepository: UserRepository,
    private val customerRepository: CustomerRepository
) {
    
    fun getUser(request: ServerRequest): Mono<ServerResponse> {
        val userId = request.pathVariable("id").toLong()
        
        return userRepository.findById(userId)
            .flatMap { user ->
                ServerResponse.ok()
                    .contentType(MediaType.APPLICATION_JSON)
                    .bodyValue(user)
            }
            .switchIfEmpty( 
                ServerResponse.notFound().build()
            )
    }
    
    fun getUserCustomers(request: ServerRequest): Mono<ServerResponse> {
        val userId = request.pathVariable("id").toLong()
        
        return userRepository.findById(userId)
            .flatMapMany { user -> customerRepository.findByUser(user) }
            .collectList()
            .flatMap { customers ->
                ServerResponse.ok()
                    .contentType(MediaType.APPLICATION_JSON)
                    .bodyValue(customers)
            }
    }
    
    fun deleteUser(request: ServerRequest): Mono<ServerResponse> {
        val userId = request.pathVariable("id").toLong()
        
        return userRepository.deleteById(userId)
            .then(ServerResponse.noContent().build()) 
    }
}

NOTE

函数式模型的优势在于更清晰的关注点分离和更灵活的路由组合能力。你可以定义多个 RouterFunction Bean 来模块化路由定义。

Spring Boot 的 WebFlux 自动配置

Spring Boot 为 WebFlux 提供了开箱即用的自动配置,大大简化了开发工作。

快速开始

只需添加 WebFlux starter 依赖:

kotlin
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-webflux")
}

WARNING

重要提醒:不要同时添加 spring-boot-starter-webspring-boot-starter-webflux

如果同时存在,Spring Boot 会选择 Spring MVC 而不是 WebFlux。如果你确实需要在 MVC 应用中使用响应式 WebClient,可以通过 SpringApplication.setWebApplicationType(WebApplicationType.REACTIVE) 强制指定。

自动配置提供的功能

Spring Boot 的 WebFlux 自动配置包含:

  • HTTP 消息转换器:自动配置 JSON、XML 等格式的序列化/反序列化
  • 静态资源服务:支持静态文件和 WebJars
  • 错误处理:提供统一的异常处理机制
  • Web 过滤器:自动注册和排序各种过滤器

实战案例:构建响应式 API

让我们通过一个完整的例子来展示 WebFlux 的强大能力:

场景描述

构建一个图书管理系统的 API,需要处理以下业务场景:

  • 获取图书详情
  • 搜索图书(可能返回大量结果)
  • 获取图书的评论流(实时数据流)
完整的响应式图书服务实现
kotlin
// 数据模型
data class Book(
    val id: Long,
    val title: String,
    val author: String,
    val isbn: String,
    val publishedYear: Int
)

data class Review(
    val id: Long,
    val bookId: Long,
    val reviewer: String,
    val rating: Int,
    val comment: String,
    val createdAt: LocalDateTime = LocalDateTime.now()
)

// 响应式 Repository(模拟)
@Repository
class BookRepository {
    
    private val books = listOf(
        Book(1, "Spring Boot 实战", "张三", "978-1234567890", 2023),
        Book(2, "Kotlin 编程指南", "李四", "978-0987654321", 2022),
        Book(3, "响应式编程", "王五", "978-1122334455", 2024)
    )
    
    fun findById(id: Long): Mono<Book> {
        return Mono.fromCallable {
            books.find { it.id == id }
        }.subscribeOn(Schedulers.boundedElastic()) 
    }
    
    fun findByTitleContaining(keyword: String): Flux<Book> {
        return Flux.fromIterable(
            books.filter { it.title.contains(keyword, ignoreCase = true) }
        ).delayElements(Duration.ofMillis(100)) 
    }
    
    fun findAll(): Flux<Book> {
        return Flux.fromIterable(books)
            .delayElements(Duration.ofMillis(50))
    }
}

@Repository  
class ReviewRepository {
    
    fun findReviewStreamByBookId(bookId: Long): Flux<Review> {
        return Flux.interval(Duration.ofSeconds(2)) 
            .map { index ->
                Review(
                    id = index,
                    bookId = bookId,
                    reviewer = "用户${index + 1}",
                    rating = (1..5).random(),
                    comment = "这是第${index + 1}条评论"
                )
            }
            .take(10) // 限制为10条评论
    }
}

// 响应式控制器
@RestController
@RequestMapping("/api/books")
class BookController(
    private val bookRepository: BookRepository,
    private val reviewRepository: ReviewRepository
) {
    
    // 获取单本图书
    @GetMapping("/{id}")
    fun getBook(@PathVariable id: Long): Mono<ResponseEntity<Book>> {
        return bookRepository.findById(id)
            .map { book -> ResponseEntity.ok(book) } 
            .defaultIfEmpty(ResponseEntity.notFound().build()) 
    }
    
    // 搜索图书
    @GetMapping("/search")
    fun searchBooks(@RequestParam keyword: String): Flux<Book> {
        return bookRepository.findByTitleContaining(keyword)
            .doOnNext { book ->
                println("找到图书: ${book.title}")
            }
    }
    
    // 获取所有图书(分页)
    @GetMapping
    fun getAllBooks(
        @RequestParam(defaultValue = "0") page: Int,
        @RequestParam(defaultValue = "10") size: Int
    ): Flux<Book> {
        return bookRepository.findAll()
            .skip((page * size).toLong()) 
            .take(size.toLong()) 
    }
    
    // 实时评论流 - 服务器发送事件(SSE)
    @GetMapping("/{id}/reviews/stream", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun getReviewStream(@PathVariable id: Long): Flux<Review> {
        return reviewRepository.findReviewStreamByBookId(id)
            .doOnSubscribe { 
                println("客户端开始订阅图书 $id 的评论流")
            }
            .doOnCancel { 
                println("客户端取消订阅图书 $id 的评论流")
            }
    }
}

响应式数据流的处理

上面的例子展示了 WebFlux 处理不同类型数据流的能力:

错误处理与异常管理

WebFlux 提供了强大的错误处理机制,支持 RFC 9457 Problem Details 标准:

启用 Problem Details

yaml
spring:
  webflux:
    problemdetails:
      enabled: true

自定义异常处理器

kotlin
@Component
class GlobalErrorHandler : AbstractErrorWebExceptionHandler(
    ErrorAttributes(), 
    WebProperties().resources, 
    ApplicationContext()
) {
    
    override fun getRoutingFunction(errorAttributes: ErrorAttributes): RouterFunction<ServerResponse> {
        return RouterFunctions.route(
            RequestPredicates.all(),
            this::renderErrorResponse
        )
    }
    
    private fun renderErrorResponse(request: ServerRequest): Mono<ServerResponse> {
        val error = getError(request)
        
        return when (error) {
            is BookNotFoundException ->
                ServerResponse.status(HttpStatus.NOT_FOUND)
                    .contentType(MediaType.APPLICATION_JSON)
                    .bodyValue(mapOf(
                        "type" to "https://api.example.com/problems/book-not-found",
                        "title" to "图书未找到",
                        "status" to 404,
                        "detail" to error.message
                    ))
            
            is ValidationException ->
                ServerResponse.status(HttpStatus.BAD_REQUEST)
                    .contentType(MediaType.APPLICATION_JSON)
                    .bodyValue(mapOf(
                        "type" to "https://api.example.com/problems/validation-error",
                        "title" to "参数验证失败",
                        "status" to 400,
                        "detail" to error.message
                    ))
            
            else ->
                ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
                    .contentType(MediaType.APPLICATION_JSON)
                    .bodyValue(mapOf(
                        "type" to "https://api.example.com/problems/internal-error",
                        "title" to "服务器内部错误",
                        "status" to 500,
                        "detail" to "请联系系统管理员"
                    ))
        }
    }
}

// 自定义异常
class BookNotFoundException(message: String) : RuntimeException(message)
class ValidationException(message: String) : RuntimeException(message)

Web 过滤器

WebFlux 的过滤器机制允许你在请求处理管道中插入自定义逻辑:

kotlin
@Component
class RequestLoggingFilter : WebFilter {
    
    private val logger = LoggerFactory.getLogger(RequestLoggingFilter::class.java)
    
    override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
        val request = exchange.request
        val startTime = System.currentTimeMillis()
        
        logger.info("请求开始: ${request.method} ${request.uri}") 
        
        return chain.filter(exchange)
            .doFinally { 
                val duration = System.currentTimeMillis() - startTime
                val response = exchange.response
                logger.info(
                    "请求完成: ${request.method} ${request.uri} " +
                    "状态码: ${response.statusCode} 耗时: ${duration}ms"
                )
            }
    }
}

// 自定义认证过滤器
@Component
@Order(-100) // 高优先级
class AuthenticationFilter : WebFilter {
    
    override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
        val request = exchange.request
        val authHeader = request.headers.getFirst("Authorization")
        
        // 跳过公开端点
        if (request.uri.path.startsWith("/api/public")) {
            return chain.filter(exchange)
        }
        
        return if (authHeader?.startsWith("Bearer ") == true) {
            val token = authHeader.substring(7)
            validateToken(token) 
                .flatMap { isValid ->
                    if (isValid) {
                        chain.filter(exchange)
                    } else {
                        handleUnauthorized(exchange)
                    }
                }
        } else {
            handleUnauthorized(exchange)
        }
    }
    
    private fun validateToken(token: String): Mono<Boolean> {
        // 模拟异步token验证
        return Mono.fromCallable { token == "valid-token" }
            .subscribeOn(Schedulers.boundedElastic())
    }
    
    private fun handleUnauthorized(exchange: ServerWebExchange): Mono<Void> {
        val response = exchange.response
        response.statusCode = HttpStatus.UNAUTHORIZED
        return response.setComplete()
    }
}

服务器定制化配置

配置文件方式

yaml
server:
  port: 8080
  netty:
    connection-timeout: 2s
    idle-timeout: 30s
  error:
    path: /error
    include-message: always
    include-binding-errors: always

spring:
  webflux:
    static-path-pattern: /static/**
    webjars-path-pattern: /webjars/**

程序化配置

kotlin
@Component
class ServerCustomizer : WebServerFactoryCustomizer<NettyReactiveWebServerFactory> {
    
    override fun customize(factory: NettyReactiveWebServerFactory) {
        factory.addServerCustomizers { httpServer ->
            httpServer
                .port(9090) 
                .idleTimeout(Duration.ofSeconds(30)) 
                .option(ChannelOption.SO_KEEPALIVE, true) 
                .childOption(ChannelOption.TCP_NODELAY, true) 
        }
    }
}

性能优化最佳实践

1. 背压处理

kotlin
@GetMapping("/data/stream")
fun getDataStream(): Flux<Data> {
    return dataRepository.findAll()
        .onBackpressureBuffer(1000) 
        .delayElements(Duration.ofMillis(10))
}

2. 错误恢复

kotlin
@GetMapping("/books/{id}")
fun getBookWithFallback(@PathVariable id: Long): Mono<Book> {
    return bookRepository.findById(id)
        .timeout(Duration.ofSeconds(5)) 
        .retry(3) 
        .onErrorReturn( 
            Book(id, "默认图书", "未知作者", "", 0)
        )
}

3. 资源共享配置

kotlin
@Configuration
class ReactiveResourceConfiguration {
    
    @Bean
    fun reactorResourceFactory(): ReactorResourceFactory {
        val factory = ReactorResourceFactory()
        factory.isUseGlobalResources = false
        factory.connectionProvider = ConnectionProvider.builder("custom")
            .maxConnections(100) 
            .pendingAcquireMaxCount(200) 
            .build()
        return factory
    }
}

总结

Spring WebFlux 为现代 Web 应用开发带来了革命性的变化:

核心优势

高并发处理能力:用少量线程处理大量并发请求
资源利用率高:非阻塞 I/O 显著减少资源消耗
实时数据流:原生支持 SSE 和 WebSocket
灵活的编程模型:注解式和函数式两种选择
完善的生态系统:与 Spring Boot 无缝集成

IMPORTANT

WebFlux 特别适合以下场景:

  • 高并发的 API 服务
  • 需要实时数据推送的应用
  • 微服务架构中的响应式服务
  • I/O 密集型应用

通过本指南的学习,你已经掌握了 WebFlux 的核心概念和实践技巧。现在就开始构建你的第一个响应式 Web 应用吧!🎉