Skip to content

Spring WebSocket STOMP Simple Broker 详解 🚀

什么是 Simple Broker?

Simple Broker(简单消息代理)是 Spring WebSocket STOMP 协议中的一个内置消息代理实现。它就像是一个消息中转站,负责接收客户端的订阅请求,将这些订阅信息存储在内存中,然后将消息广播给所有匹配目标地址的已连接客户端。

NOTE

想象一下邮局的工作方式:客户端告诉邮局(Simple Broker)"我想收到发往某个地址的所有邮件",然后当有邮件到达这个地址时,邮局就会把邮件分发给所有订阅了这个地址的客户端。

为什么需要 Simple Broker?

解决的核心问题

在没有消息代理的情况下,实现实时消息推送会面临以下挑战:

kotlin
// 传统方式:需要手动管理所有连接和订阅关系
class TraditionalWebSocketHandler {
    private val connections = mutableMapOf<String, WebSocketSession>()
    private val subscriptions = mutableMapOf<String, MutableList<String>>()
    
    fun handleSubscription(sessionId: String, destination: String) {
        // 手动管理订阅关系 😰
        subscriptions.getOrPut(destination) { mutableListOf() }.add(sessionId)
    }
    
    fun broadcastMessage(destination: String, message: String) {
        // 手动查找所有订阅者并发送消息 😰
        subscriptions[destination]?.forEach { sessionId ->
            connections[sessionId]?.sendMessage(TextMessage(message))
        }
    }
}
kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
    
    override fun configureMessageBroker(registry: MessageBrokerRegistry) {
        // 一行代码搞定消息代理 ✨
        registry.enableSimpleBroker("/topic", "/queue") 
        registry.setApplicationDestinationPrefixes("/app")
    }
}

Simple Broker 的核心特性

1. 内存存储订阅信息

Simple Broker 将所有的订阅关系存储在内存中,这意味着:

优势

  • 快速响应:内存访问速度极快
  • 简单配置:无需外部依赖
  • 开发友好:适合开发和测试环境

局限性

  • 不支持集群:多个服务实例无法共享订阅信息
  • 重启丢失:服务重启后所有订阅关系丢失
  • 内存限制:大量连接时可能消耗过多内存

2. 支持路径模式匹配

Simple Broker 支持类似文件路径的目标地址,并且支持 Ant 风格的模式匹配:

kotlin
@Controller
class ChatController {
    
    @MessageMapping("/chat.send")
    @SendTo("/topic/chat.{roomId}") 
    fun sendMessage(@DestinationVariable roomId: String, message: ChatMessage): ChatMessage {
        return message.copy(timestamp = System.currentTimeMillis())
    }
    
    // 客户端可以订阅具体房间
    // /topic/chat.room1
    // /topic/chat.room2
    // 或使用通配符模式
    // /topic/chat.*
}

3. 支持点分隔符

除了传统的斜杠分隔符,还可以使用点分隔符:

kotlin
override fun configureMessageBroker(registry: MessageBrokerRegistry) {
    registry.enableSimpleBroker("/topic", "/queue")
        .setPathMatcher(AntPathMatcher(".")) 
    
    // 现在可以使用这样的目标地址:
    // topic.chat.room1
    // queue.notifications.user123
}

配置 STOMP 心跳机制

为了保持连接的活跃性,Simple Broker 支持 STOMP 心跳机制:

kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfiguration : WebSocketMessageBrokerConfigurer {

    private lateinit var messageBrokerTaskScheduler: TaskScheduler

    @Autowired
    fun setMessageBrokerTaskScheduler(@Lazy taskScheduler: TaskScheduler) { 
        this.messageBrokerTaskScheduler = taskScheduler
    }

    override fun configureMessageBroker(registry: MessageBrokerRegistry) {
        registry.enableSimpleBroker("/queue/", "/topic/")
            .setHeartbeatValue(longArrayOf(10000, 20000)) 
            .setTaskScheduler(messageBrokerTaskScheduler) 
        
        registry.setApplicationDestinationPrefixes("/app")
    }
}

IMPORTANT

心跳配置说明:

  • 第一个值(10000ms):服务器向客户端发送心跳的间隔
  • 第二个值(20000ms):期望客户端向服务器发送心跳的间隔
  • @Lazy 注解避免循环依赖问题

完整的实战示例

让我们通过一个聊天室应用来看看 Simple Broker 的完整使用:

点击查看完整的聊天室实现
kotlin
// 1. WebSocket 配置
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
    
    override fun configureMessageBroker(registry: MessageBrokerRegistry) {
        // 启用简单消息代理
        registry.enableSimpleBroker("/topic", "/queue") 
            .setHeartbeatValue(longArrayOf(10000, 20000))
        
        // 设置应用程序目标前缀
        registry.setApplicationDestinationPrefixes("/app")
    }
    
    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        registry.addEndpoint("/ws")
            .setAllowedOriginPatterns("*")
            .withSockJS()
    }
}

// 2. 消息模型
data class ChatMessage(
    val id: String = UUID.randomUUID().toString(),
    val content: String,
    val sender: String,
    val roomId: String,
    val timestamp: Long = System.currentTimeMillis(),
    val type: MessageType = MessageType.CHAT
)

enum class MessageType {
    CHAT, JOIN, LEAVE
}

// 3. 控制器
@Controller
class ChatController {
    
    private val logger = LoggerFactory.getLogger(ChatController::class.java)
    
    @MessageMapping("/chat.send.{roomId}")
    @SendTo("/topic/chat.{roomId}") 
    fun sendMessage(
        @DestinationVariable roomId: String,
        message: ChatMessage
    ): ChatMessage {
        logger.info("收到房间 $roomId 的消息: ${message.content}")
        return message.copy(roomId = roomId)
    }
    
    @MessageMapping("/chat.join.{roomId}")
    @SendTo("/topic/chat.{roomId}") 
    fun joinRoom(
        @DestinationVariable roomId: String,
        @Payload username: String
    ): ChatMessage {
        logger.info("用户 $username 加入房间 $roomId")
        return ChatMessage(
            content = "$username 加入了聊天室",
            sender = "系统",
            roomId = roomId,
            type = MessageType.JOIN
        )
    }
    
    @MessageMapping("/chat.leave.{roomId}")
    @SendTo("/topic/chat.{roomId}") 
    fun leaveRoom(
        @DestinationVariable roomId: String,
        @Payload username: String
    ): ChatMessage {
        logger.info("用户 $username 离开房间 $roomId")
        return ChatMessage(
            content = "$username 离开了聊天室",
            sender = "系统",
            roomId = roomId,
            type = MessageType.LEAVE
        )
    }
}

// 4. 连接事件监听器
@Component
class WebSocketEventListener {
    
    private val logger = LoggerFactory.getLogger(WebSocketEventListener::class.java)
    
    @Autowired
    private lateinit var messagingTemplate: SimpMessagingTemplate
    
    @EventListener
    fun handleWebSocketConnectListener(event: SessionConnectedEvent) {
        logger.info("新的 WebSocket 连接: ${event.message}")
    }
    
    @EventListener
    fun handleWebSocketDisconnectListener(event: SessionDisconnectEvent) {
        logger.info("WebSocket 连接断开: ${event.sessionId}")
        
        // 可以在这里处理用户离线逻辑
        val username = event.user?.name
        if (username != null) {
            // 通知其他用户该用户已离线
            val leaveMessage = ChatMessage(
                content = "$username 已离线",
                sender = "系统",
                roomId = "general", // 假设有一个通用房间
                type = MessageType.LEAVE
            )
            messagingTemplate.convertAndSend("/topic/chat.general", leaveMessage) 
        }
    }
}

消息流转过程

让我们通过时序图来理解 Simple Broker 的工作流程:

最佳实践建议

1. 目标地址设计

kotlin
// ✅ 推荐的目标地址设计
"/topic/chat.{roomId}"        // 聊天室消息
"/topic/notifications.{userId}" // 用户通知
"/queue/private.{userId}"     // 私人消息

// ❌ 避免的设计
"/topic/all"                  // 太宽泛
"/topic/chat/room/123/messages" // 层级过深

2. 内存管理

注意事项

Simple Broker 将所有订阅信息存储在内存中,在高并发场景下需要注意:

  • 监控内存使用情况
  • 设置合理的连接超时
  • 考虑使用外部消息代理(如 RabbitMQ、ActiveMQ)

3. 错误处理

kotlin
@MessageExceptionHandler
@SendToUser("/queue/errors")
fun handleException(exception: Exception): String {
    logger.error("消息处理异常", exception)
    return "消息处理失败: ${exception.message}"
}

总结

Simple Broker 是 Spring WebSocket STOMP 提供的一个轻量级消息代理解决方案:

适用场景

  • 开发和测试环境:快速搭建 WebSocket 应用
  • 小型应用:用户量不大的实时应用
  • 原型验证:快速验证 WebSocket 功能

::: important 生产环境建议 对于生产环境,特别是需要支持集群部署的场景,建议使用外部消息代理(如 RabbitMQ、ActiveMQ)来替代 Simple Broker,以获得更好的可扩展性和可靠性。 :::

通过 Simple Broker,我们可以用最少的配置快速实现 WebSocket 实时通信功能,让开发者专注于业务逻辑的实现,而不用担心底层的连接管理和消息分发机制。 ✨