Appearance
Spring WebSocket STOMP 事件机制详解 🚀
概述
在 Spring WebSocket STOMP 应用中,框架会自动发布多种 ApplicationContext
事件,这些事件为我们提供了监控和管理 WebSocket 连接生命周期的强大能力。通过实现 Spring 的 ApplicationListener
接口,我们可以轻松地监听和响应这些事件。
IMPORTANT
STOMP 事件机制是构建健壮 WebSocket 应用的关键组件,它让我们能够优雅地处理连接状态变化、会话管理和错误恢复。
为什么需要 STOMP 事件机制? 🤔
想象一下,如果没有事件机制,我们会遇到以下问题:
- 连接状态盲区:无法知道客户端何时连接或断开
- 消息发送时机不当:在 broker 不可用时仍然发送消息
- 资源泄漏:无法及时清理断开连接的会话资源
- 用户体验差:无法向用户反馈连接状态
STOMP 事件机制正是为了解决这些痛点而设计的!
核心事件类型详解 📋
1. BrokerAvailabilityEvent - Broker 可用性事件
这个事件指示 broker 何时变为可用或不可用状态。
kotlin
@Component
class BrokerAvailabilityListener : ApplicationListener<BrokerAvailabilityEvent> {
private val logger = LoggerFactory.getLogger(BrokerAvailabilityListener::class.java)
override fun onApplicationEvent(event: BrokerAvailabilityEvent) {
when {
event.isBrokerAvailable -> {
logger.info("🟢 Broker 已可用,可以安全发送消息")
// 可以在这里恢复消息发送或通知相关组件
resumeMessageSending()
}
else -> {
logger.warn("🔴 Broker 不可用,暂停消息发送")
// 暂停消息发送,避免 MessageDeliveryException
pauseMessageSending()
}
}
}
private fun resumeMessageSending() {
// 恢复消息发送逻辑
}
private fun pauseMessageSending() {
// 暂停消息发送逻辑
}
}
WARNING
使用 SimpMessagingTemplate
的组件应该订阅此事件,避免在 broker 不可用时发送消息,并准备好处理 MessageDeliveryException
。
2. SessionConnectEvent - 会话连接事件
当收到新的 STOMP CONNECT 时发布,标志着新客户端会话的开始。
kotlin
@Component
class SessionConnectListener : ApplicationListener<SessionConnectEvent> {
private val logger = LoggerFactory.getLogger(SessionConnectListener::class.java)
private val activeUsers = mutableSetOf<String>()
override fun onApplicationEvent(event: SessionConnectEvent) {
val accessor = StompMessageHeaderAccessor.wrap(event.message)
val sessionId = accessor.sessionId
val username = accessor.user?.name ?: "匿名用户"
val customHeaders = accessor.toNativeHeaderMap()
logger.info("👋 用户连接: $username, 会话ID: $sessionId")
// 记录活跃用户
activeUsers.add(username)
// 处理自定义头部信息
customHeaders["client-type"]?.let { clientType ->
logger.info("客户端类型: ${clientType.first()}")
}
// 可以在这里进行用户认证、权限检查等
validateUserPermissions(username)
}
private fun validateUserPermissions(username: String) {
// 用户权限验证逻辑
}
}
3. SessionConnectedEvent - 会话已连接事件
在 SessionConnectEvent
之后不久发布,当 broker 发送 STOMP CONNECTED 帧作为对 CONNECT 的响应时触发。
kotlin
@Component
class SessionConnectedListener : ApplicationListener<SessionConnectedEvent> {
private val logger = LoggerFactory.getLogger(SessionConnectedListener::class.java)
@Autowired
private lateinit var messagingTemplate: SimpMessagingTemplate
override fun onApplicationEvent(event: SessionConnectedEvent) {
val accessor = StompMessageHeaderAccessor.wrap(event.message)
val sessionId = accessor.sessionId
val username = accessor.user?.name ?: "匿名用户"
logger.info("✅ STOMP 会话完全建立: $username, 会话ID: $sessionId")
// 发送欢迎消息
sendWelcomeMessage(username, sessionId)
// 通知其他用户有新用户加入
notifyUserJoined(username)
}
private fun sendWelcomeMessage(username: String, sessionId: String?) {
sessionId?.let {
messagingTemplate.convertAndSendToUser(
username,
"/queue/welcome",
"欢迎 $username!连接已建立。"
)
}
}
private fun notifyUserJoined(username: String) {
messagingTemplate.convertAndSend(
"/topic/user-status",
mapOf("type" to "joined", "username" to username)
)
}
}
4. SessionSubscribeEvent - 订阅事件
当收到新的 STOMP SUBSCRIBE 时发布。
kotlin
@Component
class SessionSubscribeListener : ApplicationListener<SessionSubscribeEvent> {
private val logger = LoggerFactory.getLogger(SessionSubscribeListener::class.java)
private val subscriptionTracker = mutableMapOf<String, MutableSet<String>>()
override fun onApplicationEvent(event: SessionSubscribeEvent) {
val accessor = StompMessageHeaderAccessor.wrap(event.message)
val sessionId = accessor.sessionId ?: return
val destination = accessor.destination ?: return
val subscriptionId = accessor.subscriptionId ?: return
logger.info("📡 用户订阅: 会话=$sessionId, 目标=$destination, 订阅ID=$subscriptionId")
// 跟踪订阅
subscriptionTracker.computeIfAbsent(sessionId) { mutableSetOf() }
.add(destination)
// 根据订阅的目标进行特殊处理
handleSpecialDestinations(destination, sessionId)
}
private fun handleSpecialDestinations(destination: String, sessionId: String) {
when {
destination.startsWith("/topic/chat") -> {
logger.info("用户加入聊天室: $destination")
// 发送聊天室历史消息等
}
destination.startsWith("/queue/notifications") -> {
logger.info("用户订阅通知: $sessionId")
// 发送未读通知等
}
}
}
}
5. SessionUnsubscribeEvent - 取消订阅事件
当收到新的 STOMP UNSUBSCRIBE 时发布。
kotlin
@Component
class SessionUnsubscribeListener : ApplicationListener<SessionUnsubscribeEvent> {
private val logger = LoggerFactory.getLogger(SessionUnsubscribeListener::class.java)
override fun onApplicationEvent(event: SessionUnsubscribeEvent) {
val accessor = StompMessageHeaderAccessor.wrap(event.message)
val sessionId = accessor.sessionId
val subscriptionId = accessor.subscriptionId
logger.info("📴 用户取消订阅: 会话=$sessionId, 订阅ID=$subscriptionId")
// 清理相关资源
cleanupSubscriptionResources(sessionId, subscriptionId)
}
private fun cleanupSubscriptionResources(sessionId: String?, subscriptionId: String?) {
// 清理订阅相关的资源
sessionId?.let {
// 从订阅跟踪器中移除
logger.debug("清理会话 $it 的订阅资源")
}
}
}
6. SessionDisconnectEvent - 会话断开事件
当 STOMP 会话结束时发布,这是最重要的清理事件。
kotlin
@Component
class SessionDisconnectListener : ApplicationListener<SessionDisconnectEvent> {
private val logger = LoggerFactory.getLogger(SessionDisconnectListener::class.java)
private val processedDisconnects = mutableSetOf<String>()
@Autowired
private lateinit var messagingTemplate: SimpMessagingTemplate
override fun onApplicationEvent(event: SessionDisconnectEvent) {
val accessor = StompMessageHeaderAccessor.wrap(event.message)
val sessionId = accessor.sessionId ?: return
val username = accessor.user?.name ?: "匿名用户"
// 防止重复处理同一会话的断开事件
if (!processedDisconnects.add(sessionId)) {
logger.debug("会话 $sessionId 的断开事件已处理,跳过重复处理")
return
}
logger.info("👋 用户断开连接: $username, 会话ID: $sessionId")
// 执行清理操作
performCleanup(username, sessionId)
// 通知其他用户
notifyUserLeft(username)
}
private fun performCleanup(username: String, sessionId: String) {
try {
// 清理用户会话数据
cleanupUserSession(sessionId)
// 清理订阅信息
cleanupSubscriptions(sessionId)
// 清理临时资源
cleanupTempResources(sessionId)
logger.info("✅ 会话 $sessionId 清理完成")
} catch (e: Exception) {
logger.error("❌ 清理会话 $sessionId 时发生错误", e)
}
}
private fun notifyUserLeft(username: String) {
messagingTemplate.convertAndSend(
"/topic/user-status",
mapOf("type" to "left", "username" to username)
)
}
private fun cleanupUserSession(sessionId: String) {
// 清理用户会话相关数据
}
private fun cleanupSubscriptions(sessionId: String) {
// 清理订阅相关数据
}
private fun cleanupTempResources(sessionId: String) {
// 清理临时资源
}
}
CAUTION
SessionDisconnectEvent
可能会为同一个会话发布多次,组件应该对多个断开事件保持幂等性。
事件处理的最佳实践 🎯
1. 统一的事件处理器
kotlin
// 为每个事件创建单独的监听器
@Component
class SessionConnectListener : ApplicationListener<SessionConnectEvent> {
// 处理连接事件
}
@Component
class SessionDisconnectListener : ApplicationListener<SessionDisconnectEvent> {
// 处理断开事件
}
kotlin
@Component
class WebSocketEventHandler {
private val logger = LoggerFactory.getLogger(WebSocketEventHandler::class.java)
@EventListener
fun handleSessionConnect(event: SessionConnectEvent) {
val accessor = StompMessageHeaderAccessor.wrap(event.message)
logger.info("🔗 会话连接: ${accessor.sessionId}")
// 处理连接逻辑
}
@EventListener
fun handleSessionConnected(event: SessionConnectedEvent) {
val accessor = StompMessageHeaderAccessor.wrap(event.message)
logger.info("✅ 会话已建立: ${accessor.sessionId}")
// 处理已连接逻辑
}
@EventListener
fun handleSessionDisconnect(event: SessionDisconnectEvent) {
val accessor = StompMessageHeaderAccessor.wrap(event.message)
logger.info("❌ 会话断开: ${accessor.sessionId}")
// 处理断开逻辑
}
@EventListener
fun handleBrokerAvailability(event: BrokerAvailabilityEvent) {
if (event.isBrokerAvailable) {
logger.info("🟢 Broker 可用")
} else {
logger.warn("🔴 Broker 不可用")
}
}
}
2. 异步事件处理
kotlin
@Component
class AsyncWebSocketEventHandler {
@Async
@EventListener
fun handleSessionConnectAsync(event: SessionConnectEvent) {
// 异步处理连接事件,避免阻塞主线程
performHeavyConnectionProcessing(event)
}
@Async
@EventListener
fun handleSessionDisconnectAsync(event: SessionDisconnectEvent) {
// 异步处理断开事件,避免影响其他连接
performHeavyCleanupProcessing(event)
}
private fun performHeavyConnectionProcessing(event: SessionConnectEvent) {
// 执行耗时的连接处理逻辑
Thread.sleep(1000) // 模拟耗时操作
}
private fun performHeavyCleanupProcessing(event: SessionDisconnectEvent) {
// 执行耗时的清理逻辑
Thread.sleep(1000) // 模拟耗时操作
}
}
实际应用场景示例 💼
在线聊天应用的完整事件处理
完整的聊天应用事件处理器实现
kotlin
@Component
class ChatApplicationEventHandler {
private val logger = LoggerFactory.getLogger(ChatApplicationEventHandler::class.java)
private val onlineUsers = ConcurrentHashMap<String, UserSession>()
private val chatRooms = ConcurrentHashMap<String, MutableSet<String>>()
@Autowired
private lateinit var messagingTemplate: SimpMessagingTemplate
@EventListener
fun handleConnect(event: SessionConnectEvent) {
val accessor = StompMessageHeaderAccessor.wrap(event.message)
val sessionId = accessor.sessionId ?: return
val username = accessor.user?.name ?: return
val userSession = UserSession(
sessionId = sessionId,
username = username,
connectTime = System.currentTimeMillis()
)
onlineUsers[sessionId] = userSession
logger.info("👤 用户上线: $username ($sessionId)")
}
@EventListener
fun handleConnected(event: SessionConnectedEvent) {
val accessor = StompMessageHeaderAccessor.wrap(event.message)
val sessionId = accessor.sessionId ?: return
val userSession = onlineUsers[sessionId] ?: return
// 发送欢迎消息
messagingTemplate.convertAndSendToUser(
userSession.username,
"/queue/welcome",
WelcomeMessage(
message = "欢迎来到聊天室!",
onlineCount = onlineUsers.size
)
)
// 通知其他用户
messagingTemplate.convertAndSend(
"/topic/user-status",
UserStatusMessage(
type = "online",
username = userSession.username,
timestamp = System.currentTimeMillis()
)
)
}
@EventListener
fun handleSubscribe(event: SessionSubscribeEvent) {
val accessor = StompMessageHeaderAccessor.wrap(event.message)
val sessionId = accessor.sessionId ?: return
val destination = accessor.destination ?: return
val userSession = onlineUsers[sessionId] ?: return
when {
destination.startsWith("/topic/chat/") -> {
val roomId = destination.substringAfterLast("/")
joinChatRoom(userSession.username, roomId)
sendChatHistory(userSession.username, roomId)
}
}
}
@EventListener
fun handleDisconnect(event: SessionDisconnectEvent) {
val accessor = StompMessageHeaderAccessor.wrap(event.message)
val sessionId = accessor.sessionId ?: return
val userSession = onlineUsers.remove(sessionId) ?: return
logger.info("👋 用户下线: ${userSession.username} ($sessionId)")
// 从所有聊天室中移除用户
chatRooms.values.forEach { it.remove(userSession.username) }
// 通知其他用户
messagingTemplate.convertAndSend(
"/topic/user-status",
UserStatusMessage(
type = "offline",
username = userSession.username,
timestamp = System.currentTimeMillis()
)
)
}
@EventListener
fun handleBrokerAvailability(event: BrokerAvailabilityEvent) {
if (event.isBrokerAvailable) {
logger.info("🟢 消息代理可用,恢复消息服务")
// 可以在这里重新发送缓存的消息
} else {
logger.warn("🔴 消息代理不可用,暂停消息服务")
// 可以在这里缓存要发送的消息
}
}
private fun joinChatRoom(username: String, roomId: String) {
chatRooms.computeIfAbsent(roomId) { ConcurrentHashMap.newKeySet() }
.add(username)
messagingTemplate.convertAndSend(
"/topic/chat/$roomId",
ChatMessage(
type = "system",
content = "$username 加入了聊天室",
timestamp = System.currentTimeMillis()
)
)
}
private fun sendChatHistory(username: String, roomId: String) {
// 发送聊天历史记录
val history = getChatHistory(roomId)
messagingTemplate.convertAndSendToUser(
username,
"/queue/chat-history",
history
)
}
private fun getChatHistory(roomId: String): List<ChatMessage> {
// 从数据库或缓存中获取聊天历史
return emptyList()
}
}
data class UserSession(
val sessionId: String,
val username: String,
val connectTime: Long
)
data class WelcomeMessage(
val message: String,
val onlineCount: Int
)
data class UserStatusMessage(
val type: String,
val username: String,
val timestamp: Long
)
data class ChatMessage(
val type: String,
val content: String,
val timestamp: Long
)
注意事项与最佳实践 ⚠️
NOTE
当使用功能完整的 broker 时,STOMP "broker relay" 会在 broker 临时不可用时自动重连"系统"连接。但是,客户端连接不会自动重连。
TIP
假设启用了心跳,客户端通常会在 10 秒内注意到 broker 没有响应。客户端需要实现自己的重连逻辑。
关键要点:
- 幂等性处理:
SessionDisconnectEvent
可能多次触发,确保处理逻辑是幂等的 - 异常处理:事件处理器中的异常不应影响其他组件
- 性能考虑:避免在事件处理器中执行耗时操作,考虑使用异步处理
- 资源清理:及时清理断开连接的会话资源,防止内存泄漏
总结 📝
Spring WebSocket STOMP 事件机制为我们提供了强大的会话生命周期管理能力。通过合理使用这些事件,我们可以:
- ✅ 实时跟踪用户连接状态
- ✅ 优雅处理连接异常和恢复
- ✅ 及时清理资源,避免内存泄漏
- ✅ 提供更好的用户体验
- ✅ 构建健壮的实时通信应用
掌握这些事件的使用,是构建高质量 WebSocket 应用的关键技能!🎉