Skip to content

Spring Boot 流式响应测试:MockMvc 的局限性与解决方案 🌊

🎯 什么是流式响应?为什么需要它?

在现代 Web 应用中,我们经常需要处理实时数据流,比如:

  • 📊 实时股票价格更新
  • 💬 聊天消息推送
  • 📈 系统监控数据
  • 🔄 长时间运行的任务进度

传统的 HTTP 请求-响应模式需要客户端不断轮询服务器获取最新数据,这种方式效率低下且浪费资源。流式响应(如 Server-Sent Events)允许服务器主动向客户端推送数据,建立一个持久的连接。

NOTE

流式响应就像是从水龙头接水,水会持续流出,而不是像杯子装水那样一次性完成。

🤔 MockMvc 测试流式响应的挑战

核心问题:无法取消服务器流

当我们使用 MockMvcWebTestClient 测试流式响应时,会遇到一个根本性问题:

WARNING

MockMvcWebTestClient 无法从客户端取消服务器流,这意味着测试无限流时会陷入死循环!

💡 解决方案:选择合适的测试策略

方案一:绑定到运行中的服务器

kotlin
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class StreamingControllerIntegrationTest {

    @Autowired
    lateinit var webTestClient: WebTestClient

    @Test
    fun `测试服务器发送事件流`() {
        val flux = webTestClient.get()
            .uri("/api/events")
            .accept(MediaType.TEXT_EVENT_STREAM)
            .exchange()
            .expectStatus().isOk
            .returnResult<String>()
            .responseBody
            .take(3) // [!code highlight] // 只取前3个事件,避免无限等待
            .collectList()
            .block()

        assertThat(flux).hasSize(3)
        assertThat(flux?.get(0)).contains("event-data")
    }
}
kotlin
@WebMvcTest(StreamingController::class)
class StreamingControllerMockTest {

    @Autowired
    lateinit var mockMvc: MockMvc

    @Test
    fun `MockMvc 测试有限流响应`() {
        // ✅ 这样可以工作 - 有限的流
        mockMvc.perform(get("/api/limited-events"))
            .andExpect(status().isOk)
            .andExpect(content().contentType(MediaType.TEXT_EVENT_STREAM_VALUE))
    }

    @Test
    fun `MockMvc 无法测试无限流`() {
        // ❌ 这会导致测试永远不结束
        // mockMvc.perform(get("/api/infinite-events"))
        //     .andExpect(status().isOk)
    }
}

方案二:设计可控制的流式端点

kotlin
@RestController
class StreamingController {

    // ✅ 推荐:可控制的有限流
    @GetMapping("/api/limited-events", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun limitedEvents(@RequestParam(defaultValue = "5") limit: Int): Flux<String> {
        return Flux.interval(Duration.ofSeconds(1))
            .take(limit.toLong()) // [!code highlight] // 限制事件数量
            .map { "data: Event $it\n\n" }
    }

    // ⚠️  无限流 - MockMvc 无法测试
    @GetMapping("/api/infinite-events", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun infiniteEvents(): Flux<String> {
        return Flux.interval(Duration.ofSeconds(1))
            .map { "data: Event $it\n\n" } // [!code warning] // 永不结束的流
    }

    // ✅ 异步响应 - MockMvc 支持
    @GetMapping("/api/async-data")
    fun asyncData(): DeferredResult<String> {
        val result = DeferredResult<String>(5000L) // [!code highlight] // 5秒超时
        
        // 模拟异步处理
        CompletableFuture.supplyAsync {
            Thread.sleep(2000)
            "Async result"
        }.thenAccept { data ->
            result.setResult(data) // [!code highlight] // 设置结果,响应结束
        }
        
        return result
    }
}

🔧 实际业务场景示例

场景:实时系统监控

完整的监控系统实现示例
kotlin
@RestController
@RequestMapping("/api/monitoring")
class MonitoringController {

    @GetMapping("/metrics", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun systemMetrics(): Flux<ServerSentEvent<SystemMetric>> {
        return Flux.interval(Duration.ofSeconds(2))
            .take(10) // [!code highlight] // 测试友好:限制为10个指标
            .map { 
                ServerSentEvent.builder<SystemMetric>()
                    .id(it.toString())
                    .event("metric-update")
                    .data(SystemMetric(
                        cpuUsage = Random.nextDouble(0.0, 100.0),
                        memoryUsage = Random.nextDouble(0.0, 100.0),
                        timestamp = Instant.now()
                    ))
                    .build()
            }
    }
}

data class SystemMetric(
    val cpuUsage: Double,
    val memoryUsage: Double,
    val timestamp: Instant
)

// 测试类
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class MonitoringControllerTest {

    @Autowired
    lateinit var webTestClient: WebTestClient

    @Test
    fun `测试系统指标流式推送`() {
        val metrics = webTestClient.get()
            .uri("/api/monitoring/metrics")
            .accept(MediaType.TEXT_EVENT_STREAM)
            .exchange()
            .expectStatus().isOk
            .returnResult<SystemMetric>()
            .responseBody
            .take(3) // [!code highlight] // 只验证前3个指标
            .collectList()
            .block(Duration.ofSeconds(10))

        assertThat(metrics).hasSize(3)
        metrics?.forEach { metric ->
            assertThat(metric.cpuUsage).isBetween(0.0, 100.0)
            assertThat(metric.memoryUsage).isBetween(0.0, 100.0)
        }
    }
}

📋 最佳实践总结

✅ 推荐做法

TIP

设计测试友好的流式 API

  • 提供可配置的数据量限制
  • 实现优雅的流终止机制
  • 为无限流提供专门的集成测试

⚠️ 注意事项

CAUTION

避免在单元测试中测试无限流

  • MockMvc 无法取消服务器流
  • 会导致测试永远不结束
  • 应该使用集成测试替代

🎯 测试策略选择

场景推荐测试方式原因
有限流响应MockMvc简单快速,流会自然结束
无限流响应集成测试可以控制客户端取消
异步响应MockMvc响应会在异步完成后结束
复杂流逻辑集成测试更接近真实使用场景

🚀 总结

流式响应是现代 Web 应用的重要特性,但在测试时需要特别注意:

  1. 理解限制:MockMvc 无法取消服务器流
  2. 选择策略:根据流的特性选择合适的测试方式
  3. 设计友好:为流式 API 提供测试友好的配置选项

IMPORTANT

记住:好的测试不仅要验证功能正确性,还要能够可靠地执行完成! 🎯

通过合理的设计和测试策略选择,我们可以有效地测试流式响应,确保应用的实时数据推送功能稳定可靠。