Appearance
Spring WebSocket STOMP Simple Broker 详解 🚀
什么是 Simple Broker?
Simple Broker(简单消息代理)是 Spring WebSocket STOMP 协议中的一个内置消息代理实现。它就像是一个消息中转站,负责接收客户端的订阅请求,将这些订阅信息存储在内存中,然后将消息广播给所有匹配目标地址的已连接客户端。
NOTE
想象一下邮局的工作方式:客户端告诉邮局(Simple Broker)"我想收到发往某个地址的所有邮件",然后当有邮件到达这个地址时,邮局就会把邮件分发给所有订阅了这个地址的客户端。
为什么需要 Simple Broker?
解决的核心问题
在没有消息代理的情况下,实现实时消息推送会面临以下挑战:
kotlin
// 传统方式:需要手动管理所有连接和订阅关系
class TraditionalWebSocketHandler {
private val connections = mutableMapOf<String, WebSocketSession>()
private val subscriptions = mutableMapOf<String, MutableList<String>>()
fun handleSubscription(sessionId: String, destination: String) {
// 手动管理订阅关系 😰
subscriptions.getOrPut(destination) { mutableListOf() }.add(sessionId)
}
fun broadcastMessage(destination: String, message: String) {
// 手动查找所有订阅者并发送消息 😰
subscriptions[destination]?.forEach { sessionId ->
connections[sessionId]?.sendMessage(TextMessage(message))
}
}
}
kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
override fun configureMessageBroker(registry: MessageBrokerRegistry) {
// 一行代码搞定消息代理 ✨
registry.enableSimpleBroker("/topic", "/queue")
registry.setApplicationDestinationPrefixes("/app")
}
}
Simple Broker 的核心特性
1. 内存存储订阅信息
Simple Broker 将所有的订阅关系存储在内存中,这意味着:
优势
- 快速响应:内存访问速度极快
- 简单配置:无需外部依赖
- 开发友好:适合开发和测试环境
局限性
- 不支持集群:多个服务实例无法共享订阅信息
- 重启丢失:服务重启后所有订阅关系丢失
- 内存限制:大量连接时可能消耗过多内存
2. 支持路径模式匹配
Simple Broker 支持类似文件路径的目标地址,并且支持 Ant 风格的模式匹配:
kotlin
@Controller
class ChatController {
@MessageMapping("/chat.send")
@SendTo("/topic/chat.{roomId}")
fun sendMessage(@DestinationVariable roomId: String, message: ChatMessage): ChatMessage {
return message.copy(timestamp = System.currentTimeMillis())
}
// 客户端可以订阅具体房间
// /topic/chat.room1
// /topic/chat.room2
// 或使用通配符模式
// /topic/chat.*
}
3. 支持点分隔符
除了传统的斜杠分隔符,还可以使用点分隔符:
kotlin
override fun configureMessageBroker(registry: MessageBrokerRegistry) {
registry.enableSimpleBroker("/topic", "/queue")
.setPathMatcher(AntPathMatcher("."))
// 现在可以使用这样的目标地址:
// topic.chat.room1
// queue.notifications.user123
}
配置 STOMP 心跳机制
为了保持连接的活跃性,Simple Broker 支持 STOMP 心跳机制:
kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfiguration : WebSocketMessageBrokerConfigurer {
private lateinit var messageBrokerTaskScheduler: TaskScheduler
@Autowired
fun setMessageBrokerTaskScheduler(@Lazy taskScheduler: TaskScheduler) {
this.messageBrokerTaskScheduler = taskScheduler
}
override fun configureMessageBroker(registry: MessageBrokerRegistry) {
registry.enableSimpleBroker("/queue/", "/topic/")
.setHeartbeatValue(longArrayOf(10000, 20000))
.setTaskScheduler(messageBrokerTaskScheduler)
registry.setApplicationDestinationPrefixes("/app")
}
}
IMPORTANT
心跳配置说明:
- 第一个值(10000ms):服务器向客户端发送心跳的间隔
- 第二个值(20000ms):期望客户端向服务器发送心跳的间隔
@Lazy
注解避免循环依赖问题
完整的实战示例
让我们通过一个聊天室应用来看看 Simple Broker 的完整使用:
点击查看完整的聊天室实现
kotlin
// 1. WebSocket 配置
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
override fun configureMessageBroker(registry: MessageBrokerRegistry) {
// 启用简单消息代理
registry.enableSimpleBroker("/topic", "/queue")
.setHeartbeatValue(longArrayOf(10000, 20000))
// 设置应用程序目标前缀
registry.setApplicationDestinationPrefixes("/app")
}
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS()
}
}
// 2. 消息模型
data class ChatMessage(
val id: String = UUID.randomUUID().toString(),
val content: String,
val sender: String,
val roomId: String,
val timestamp: Long = System.currentTimeMillis(),
val type: MessageType = MessageType.CHAT
)
enum class MessageType {
CHAT, JOIN, LEAVE
}
// 3. 控制器
@Controller
class ChatController {
private val logger = LoggerFactory.getLogger(ChatController::class.java)
@MessageMapping("/chat.send.{roomId}")
@SendTo("/topic/chat.{roomId}")
fun sendMessage(
@DestinationVariable roomId: String,
message: ChatMessage
): ChatMessage {
logger.info("收到房间 $roomId 的消息: ${message.content}")
return message.copy(roomId = roomId)
}
@MessageMapping("/chat.join.{roomId}")
@SendTo("/topic/chat.{roomId}")
fun joinRoom(
@DestinationVariable roomId: String,
@Payload username: String
): ChatMessage {
logger.info("用户 $username 加入房间 $roomId")
return ChatMessage(
content = "$username 加入了聊天室",
sender = "系统",
roomId = roomId,
type = MessageType.JOIN
)
}
@MessageMapping("/chat.leave.{roomId}")
@SendTo("/topic/chat.{roomId}")
fun leaveRoom(
@DestinationVariable roomId: String,
@Payload username: String
): ChatMessage {
logger.info("用户 $username 离开房间 $roomId")
return ChatMessage(
content = "$username 离开了聊天室",
sender = "系统",
roomId = roomId,
type = MessageType.LEAVE
)
}
}
// 4. 连接事件监听器
@Component
class WebSocketEventListener {
private val logger = LoggerFactory.getLogger(WebSocketEventListener::class.java)
@Autowired
private lateinit var messagingTemplate: SimpMessagingTemplate
@EventListener
fun handleWebSocketConnectListener(event: SessionConnectedEvent) {
logger.info("新的 WebSocket 连接: ${event.message}")
}
@EventListener
fun handleWebSocketDisconnectListener(event: SessionDisconnectEvent) {
logger.info("WebSocket 连接断开: ${event.sessionId}")
// 可以在这里处理用户离线逻辑
val username = event.user?.name
if (username != null) {
// 通知其他用户该用户已离线
val leaveMessage = ChatMessage(
content = "$username 已离线",
sender = "系统",
roomId = "general", // 假设有一个通用房间
type = MessageType.LEAVE
)
messagingTemplate.convertAndSend("/topic/chat.general", leaveMessage)
}
}
}
消息流转过程
让我们通过时序图来理解 Simple Broker 的工作流程:
最佳实践建议
1. 目标地址设计
kotlin
// ✅ 推荐的目标地址设计
"/topic/chat.{roomId}" // 聊天室消息
"/topic/notifications.{userId}" // 用户通知
"/queue/private.{userId}" // 私人消息
// ❌ 避免的设计
"/topic/all" // 太宽泛
"/topic/chat/room/123/messages" // 层级过深
2. 内存管理
注意事项
Simple Broker 将所有订阅信息存储在内存中,在高并发场景下需要注意:
- 监控内存使用情况
- 设置合理的连接超时
- 考虑使用外部消息代理(如 RabbitMQ、ActiveMQ)
3. 错误处理
kotlin
@MessageExceptionHandler
@SendToUser("/queue/errors")
fun handleException(exception: Exception): String {
logger.error("消息处理异常", exception)
return "消息处理失败: ${exception.message}"
}
总结
Simple Broker 是 Spring WebSocket STOMP 提供的一个轻量级消息代理解决方案:
适用场景
- 开发和测试环境:快速搭建 WebSocket 应用
- 小型应用:用户量不大的实时应用
- 原型验证:快速验证 WebSocket 功能
::: important 生产环境建议 对于生产环境,特别是需要支持集群部署的场景,建议使用外部消息代理(如 RabbitMQ、ActiveMQ)来替代 Simple Broker,以获得更好的可扩展性和可靠性。 :::
通过 Simple Broker,我们可以用最少的配置快速实现 WebSocket 实时通信功能,让开发者专注于业务逻辑的实现,而不用担心底层的连接管理和消息分发机制。 ✨