Skip to content

Spring WebSocket STOMP 消息顺序控制 📩

引言:为什么消息顺序很重要? 🤔

想象一下这样的场景:你正在开发一个实时聊天应用,用户 A 依次发送了三条消息:"你好"、"我是小明"、"请问有人在吗?"。但由于并发处理的原因,用户 B 收到的消息顺序却是:"请问有人在吗?"、"你好"、"我是小明"。这样的乱序会让对话变得莫名其妙!

IMPORTANT

在 Spring WebSocket STOMP 中,由于使用了线程池来处理消息,默认情况下无法保证消息的顺序性。这在某些业务场景下可能会造成严重问题。

问题的根源:线程池带来的乱序 ⚙️

让我们先理解问题是如何产生的:

WARNING

默认情况下,Spring WebSocket 使用 ThreadPoolExecutor 来处理消息,这意味着消息可能在不同线程中并发处理,导致接收顺序与发送顺序不一致。

解决方案:消息顺序保证机制 🔒

Spring 提供了两个关键配置来解决这个问题:

1. 保证发布顺序 (setPreservePublishOrder)

这个配置确保从消息代理发送到客户端的消息保持顺序:

kotlin
@Configuration
@EnableWebSocketMessageBroker
class PublishOrderWebSocketConfiguration : WebSocketMessageBrokerConfigurer {

    override fun configureMessageBroker(registry: MessageBrokerRegistry) {
        // 启用简单消息代理
        registry.enableSimpleBroker("/topic", "/queue") 
        
        // 设置应用消息前缀
        registry.setApplicationDestinationPrefixes("/app")
        
        // 🔑 关键配置:保证发布顺序
        registry.setPreservePublishOrder(true) 
    }
}
java
@Configuration
@EnableWebSocketMessageBroker
public class PublishOrderWebSocketConfiguration implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 启用简单消息代理
        registry.enableSimpleBroker("/topic", "/queue"); 
        
        // 设置应用消息前缀
        registry.setApplicationDestinationPrefixes("/app");
        
        // 🔑 关键配置:保证发布顺序
        registry.setPreservePublishOrder(true); 
    }
}

2. 保证接收顺序 (setPreserveReceiveOrder)

这个配置确保从客户端接收的消息按顺序处理:

kotlin
@Configuration
@EnableWebSocketMessageBroker
class ReceiveOrderWebSocketConfiguration : WebSocketMessageBrokerConfigurer {

    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        // 注册 STOMP 端点
        registry.addEndpoint("/ws")
            .setAllowedOriginPatterns("*")
            .withSockJS()
        
        // 🔑 关键配置:保证接收顺序
        registry.setPreserveReceiveOrder(true) 
    }
}
java
@Configuration
@EnableWebSocketMessageBroker
public class ReceiveOrderWebSocketConfiguration implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注册 STOMP 端点
        registry.addEndpoint("/ws")
            .setAllowedOriginPatterns("*")
            .withSockJS();
        
        // 🔑 关键配置:保证接收顺序
        registry.setPreserveReceiveOrder(true); 
    }
}

完整的实战示例:聊天室应用 💬

让我们通过一个完整的聊天室示例来看看如何应用这些配置:

完整的 WebSocket 配置类
kotlin
@Configuration
@EnableWebSocketMessageBroker
class ChatWebSocketConfig : WebSocketMessageBrokerConfigurer {

    override fun configureMessageBroker(registry: MessageBrokerRegistry) {
        // 启用简单消息代理,用于广播消息
        registry.enableSimpleBroker("/topic", "/queue")
        
        // 设置应用消息的目标前缀
        registry.setApplicationDestinationPrefixes("/app")
        
        // 🔑 保证消息发布顺序 - 确保发送给客户端的消息有序
        registry.setPreservePublishOrder(true) 
    }

    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        // 注册 WebSocket 端点
        registry.addEndpoint("/chat-websocket")
            .setAllowedOriginPatterns("*") // 允许跨域
            .withSockJS() // 启用 SockJS 支持
        
        // 🔑 保证消息接收顺序 - 确保处理客户端消息有序
        registry.setPreserveReceiveOrder(true) 
    }

    override fun configureClientInboundChannel(registration: ChannelRegistration) {
        // 配置入站通道的线程池
        registration.taskExecutor().apply {
            corePoolSize = 4
            maxPoolSize = 8
            queueCapacity = 100
        }
    }

    override fun configureClientOutboundChannel(registration: ChannelRegistration) {
        // 配置出站通道的线程池
        registration.taskExecutor().apply {
            corePoolSize = 4
            maxPoolSize = 8
            queueCapacity = 100
        }
    }
}

聊天控制器实现

kotlin
@Controller
class ChatController {

    private val logger = LoggerFactory.getLogger(ChatController::class.java)

    @MessageMapping("/chat.send")
    @SendTo("/topic/public")
    fun sendMessage(chatMessage: ChatMessage): ChatMessage {
        logger.info("收到消息: ${chatMessage.content} from ${chatMessage.sender}")
        
        // 添加时间戳确保消息顺序的可追踪性
        return chatMessage.copy(
            timestamp = System.currentTimeMillis(),
            id = UUID.randomUUID().toString()
        ) 
    }

    @MessageMapping("/chat.join")
    @SendTo("/topic/public")
    fun joinChat(chatMessage: ChatMessage): ChatMessage {
        logger.info("用户加入聊天: ${chatMessage.sender}")
        
        return chatMessage.copy(
            content = "${chatMessage.sender} 加入了聊天室!",
            type = MessageType.JOIN,
            timestamp = System.currentTimeMillis()
        )
    }
}

data class ChatMessage(
    val id: String? = null,
    val content: String,
    val sender: String,
    val type: MessageType = MessageType.CHAT,
    val timestamp: Long? = null
)

enum class MessageType {
    CHAT, JOIN, LEAVE
}

顺序保证的工作原理 ⚙️

启用顺序保证后,消息处理流程发生了变化:

TIP

启用顺序保证后,Spring 会为每个客户端会话维护一个独立的消息队列,确保该会话的消息按顺序处理。

性能考量与最佳实践 📈

性能影响

性能开销

启用消息顺序保证会带来一定的性能开销:

  • 吞吐量下降:消息需要排队等待,无法完全并行处理
  • 延迟增加:前一条消息处理完成后才能处理下一条
  • 内存占用:需要维护消息队列

何时启用顺序保证

IMPORTANT

只在真正需要消息顺序的场景下启用此功能:

适合启用的场景:

  • 聊天应用(消息顺序很重要)
  • 游戏状态同步(操作顺序影响结果)
  • 金融交易系统(交易顺序至关重要)
  • 协作编辑工具(编辑操作需要有序)

不建议启用的场景:

  • 简单的通知推送
  • 独立的状态更新
  • 高并发的数据同步
  • 对性能要求极高的系统

配置建议

kotlin
@Configuration
@EnableWebSocketMessageBroker
class OptimizedWebSocketConfig : WebSocketMessageBrokerConfigurer {

    override fun configureMessageBroker(registry: MessageBrokerRegistry) {
        registry.enableSimpleBroker("/topic", "/queue")
        registry.setApplicationDestinationPrefixes("/app")
        
        // 🎯 根据业务需求决定是否启用
        if (isOrderCriticalApplication()) { 
            registry.setPreservePublishOrder(true) 
        } 
    }

    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        registry.addEndpoint("/ws")
            .setAllowedOriginPatterns("*")
            .withSockJS()
        
        // 🎯 同样根据业务需求决定
        if (isOrderCriticalApplication()) { 
            registry.setPreserveReceiveOrder(true) 
        } 
    }

    private fun isOrderCriticalApplication(): Boolean {
        // 根据应用类型或配置属性决定
        return System.getProperty("websocket.preserve.order", "false").toBoolean()
    }
}

测试消息顺序 🧪

让我们编写一个测试来验证消息顺序:

消息顺序测试代码
kotlin
@SpringBootTest
@TestMethodOrder(OrderAnnotation::class)
class MessageOrderTest {

    @Test
    fun testMessageOrder() {
        val receivedMessages = mutableListOf<String>()
        val expectedOrder = listOf("消息1", "消息2", "消息3", "消息4", "消息5")
        
        // 模拟快速发送多条消息
        expectedOrder.forEach { message ->
            // 发送消息的逻辑
            sendMessage(message)
        }
        
        // 等待所有消息接收完成
        Thread.sleep(1000)
        
        // 验证消息顺序
        assertEquals(expectedOrder, receivedMessages) 
    }

    private fun sendMessage(content: String) {
        // 实际的消息发送逻辑
        // 这里简化处理
    }
}

总结 🎉

Spring WebSocket STOMP 的消息顺序控制为我们提供了在实时通信中保证消息有序性的能力:

核心要点

  • 问题根源:线程池并发处理导致消息乱序
  • 解决方案setPreservePublishOrdersetPreserveReceiveOrder
  • 适用场景:对消息顺序有严格要求的应用
  • 性能权衡:顺序保证会带来一定的性能开销

通过合理使用这些配置,我们可以在保证消息顺序的同时,构建出稳定可靠的实时通信应用。记住,技术选择总是需要在功能需求和性能表现之间找到最佳平衡点! ⚖️