Skip to content

Spring WebSocket STOMP 事件机制详解 🚀

概述

在 Spring WebSocket STOMP 应用中,框架会自动发布多种 ApplicationContext 事件,这些事件为我们提供了监控和管理 WebSocket 连接生命周期的强大能力。通过实现 Spring 的 ApplicationListener 接口,我们可以轻松地监听和响应这些事件。

IMPORTANT

STOMP 事件机制是构建健壮 WebSocket 应用的关键组件,它让我们能够优雅地处理连接状态变化、会话管理和错误恢复。

为什么需要 STOMP 事件机制? 🤔

想象一下,如果没有事件机制,我们会遇到以下问题:

  • 连接状态盲区:无法知道客户端何时连接或断开
  • 消息发送时机不当:在 broker 不可用时仍然发送消息
  • 资源泄漏:无法及时清理断开连接的会话资源
  • 用户体验差:无法向用户反馈连接状态

STOMP 事件机制正是为了解决这些痛点而设计的!

核心事件类型详解 📋

1. BrokerAvailabilityEvent - Broker 可用性事件

这个事件指示 broker 何时变为可用或不可用状态。

kotlin
@Component
class BrokerAvailabilityListener : ApplicationListener<BrokerAvailabilityEvent> {
    
    private val logger = LoggerFactory.getLogger(BrokerAvailabilityListener::class.java)
    
    override fun onApplicationEvent(event: BrokerAvailabilityEvent) {
        when {
            event.isBrokerAvailable -> {
                logger.info("🟢 Broker 已可用,可以安全发送消息")
                // 可以在这里恢复消息发送或通知相关组件
                resumeMessageSending() 
            }
            else -> {
                logger.warn("🔴 Broker 不可用,暂停消息发送")
                // 暂停消息发送,避免 MessageDeliveryException
                pauseMessageSending() 
            }
        }
    }
    
    private fun resumeMessageSending() {
        // 恢复消息发送逻辑
    }
    
    private fun pauseMessageSending() {
        // 暂停消息发送逻辑
    }
}

WARNING

使用 SimpMessagingTemplate 的组件应该订阅此事件,避免在 broker 不可用时发送消息,并准备好处理 MessageDeliveryException

2. SessionConnectEvent - 会话连接事件

当收到新的 STOMP CONNECT 时发布,标志着新客户端会话的开始。

kotlin
@Component
class SessionConnectListener : ApplicationListener<SessionConnectEvent> {
    
    private val logger = LoggerFactory.getLogger(SessionConnectListener::class.java)
    private val activeUsers = mutableSetOf<String>()
    
    override fun onApplicationEvent(event: SessionConnectEvent) {
        val accessor = StompMessageHeaderAccessor.wrap(event.message) 
        
        val sessionId = accessor.sessionId
        val username = accessor.user?.name ?: "匿名用户"
        val customHeaders = accessor.toNativeHeaderMap()
        
        logger.info("👋 用户连接: $username, 会话ID: $sessionId")
        
        // 记录活跃用户
        activeUsers.add(username) 
        
        // 处理自定义头部信息
        customHeaders["client-type"]?.let { clientType ->
            logger.info("客户端类型: ${clientType.first()}")
        }
        
        // 可以在这里进行用户认证、权限检查等
        validateUserPermissions(username) 
    }
    
    private fun validateUserPermissions(username: String) {
        // 用户权限验证逻辑
    }
}

3. SessionConnectedEvent - 会话已连接事件

SessionConnectEvent 之后不久发布,当 broker 发送 STOMP CONNECTED 帧作为对 CONNECT 的响应时触发。

kotlin
@Component
class SessionConnectedListener : ApplicationListener<SessionConnectedEvent> {
    
    private val logger = LoggerFactory.getLogger(SessionConnectedListener::class.java)
    
    @Autowired
    private lateinit var messagingTemplate: SimpMessagingTemplate
    
    override fun onApplicationEvent(event: SessionConnectedEvent) {
        val accessor = StompMessageHeaderAccessor.wrap(event.message)
        val sessionId = accessor.sessionId
        val username = accessor.user?.name ?: "匿名用户"
        
        logger.info("✅ STOMP 会话完全建立: $username, 会话ID: $sessionId")
        
        // 发送欢迎消息
        sendWelcomeMessage(username, sessionId) 
        
        // 通知其他用户有新用户加入
        notifyUserJoined(username) 
    }
    
    private fun sendWelcomeMessage(username: String, sessionId: String?) {
        sessionId?.let {
            messagingTemplate.convertAndSendToUser(
                username, 
                "/queue/welcome", 
                "欢迎 $username!连接已建立。"
            )
        }
    }
    
    private fun notifyUserJoined(username: String) {
        messagingTemplate.convertAndSend(
            "/topic/user-status", 
            mapOf("type" to "joined", "username" to username)
        )
    }
}

4. SessionSubscribeEvent - 订阅事件

当收到新的 STOMP SUBSCRIBE 时发布。

kotlin
@Component
class SessionSubscribeListener : ApplicationListener<SessionSubscribeEvent> {
    
    private val logger = LoggerFactory.getLogger(SessionSubscribeListener::class.java)
    private val subscriptionTracker = mutableMapOf<String, MutableSet<String>>()
    
    override fun onApplicationEvent(event: SessionSubscribeEvent) {
        val accessor = StompMessageHeaderAccessor.wrap(event.message)
        
        val sessionId = accessor.sessionId ?: return
        val destination = accessor.destination ?: return
        val subscriptionId = accessor.subscriptionId ?: return
        
        logger.info("📡 用户订阅: 会话=$sessionId, 目标=$destination, 订阅ID=$subscriptionId")
        
        // 跟踪订阅
        subscriptionTracker.computeIfAbsent(sessionId) { mutableSetOf() }
            .add(destination) 
        
        // 根据订阅的目标进行特殊处理
        handleSpecialDestinations(destination, sessionId) 
    }
    
    private fun handleSpecialDestinations(destination: String, sessionId: String) {
        when {
            destination.startsWith("/topic/chat") -> {
                logger.info("用户加入聊天室: $destination")
                // 发送聊天室历史消息等
            }
            destination.startsWith("/queue/notifications") -> {
                logger.info("用户订阅通知: $sessionId")
                // 发送未读通知等
            }
        }
    }
}

5. SessionUnsubscribeEvent - 取消订阅事件

当收到新的 STOMP UNSUBSCRIBE 时发布。

kotlin
@Component
class SessionUnsubscribeListener : ApplicationListener<SessionUnsubscribeEvent> {
    
    private val logger = LoggerFactory.getLogger(SessionUnsubscribeListener::class.java)
    
    override fun onApplicationEvent(event: SessionUnsubscribeEvent) {
        val accessor = StompMessageHeaderAccessor.wrap(event.message)
        
        val sessionId = accessor.sessionId
        val subscriptionId = accessor.subscriptionId
        
        logger.info("📴 用户取消订阅: 会话=$sessionId, 订阅ID=$subscriptionId")
        
        // 清理相关资源
        cleanupSubscriptionResources(sessionId, subscriptionId) 
    }
    
    private fun cleanupSubscriptionResources(sessionId: String?, subscriptionId: String?) {
        // 清理订阅相关的资源
        sessionId?.let { 
            // 从订阅跟踪器中移除
            logger.debug("清理会话 $it 的订阅资源")
        }
    }
}

6. SessionDisconnectEvent - 会话断开事件

当 STOMP 会话结束时发布,这是最重要的清理事件。

kotlin
@Component
class SessionDisconnectListener : ApplicationListener<SessionDisconnectEvent> {
    
    private val logger = LoggerFactory.getLogger(SessionDisconnectListener::class.java)
    private val processedDisconnects = mutableSetOf<String>() 
    
    @Autowired
    private lateinit var messagingTemplate: SimpMessagingTemplate
    
    override fun onApplicationEvent(event: SessionDisconnectEvent) {
        val accessor = StompMessageHeaderAccessor.wrap(event.message)
        
        val sessionId = accessor.sessionId ?: return
        val username = accessor.user?.name ?: "匿名用户"
        
        // 防止重复处理同一会话的断开事件
        if (!processedDisconnects.add(sessionId)) { 
            logger.debug("会话 $sessionId 的断开事件已处理,跳过重复处理")
            return
        }
        
        logger.info("👋 用户断开连接: $username, 会话ID: $sessionId")
        
        // 执行清理操作
        performCleanup(username, sessionId) 
        
        // 通知其他用户
        notifyUserLeft(username) 
    }
    
    private fun performCleanup(username: String, sessionId: String) {
        try {
            // 清理用户会话数据
            cleanupUserSession(sessionId)
            
            // 清理订阅信息
            cleanupSubscriptions(sessionId)
            
            // 清理临时资源
            cleanupTempResources(sessionId)
            
            logger.info("✅ 会话 $sessionId 清理完成")
        } catch (e: Exception) {
            logger.error("❌ 清理会话 $sessionId 时发生错误", e)
        }
    }
    
    private fun notifyUserLeft(username: String) {
        messagingTemplate.convertAndSend(
            "/topic/user-status",
            mapOf("type" to "left", "username" to username)
        )
    }
    
    private fun cleanupUserSession(sessionId: String) {
        // 清理用户会话相关数据
    }
    
    private fun cleanupSubscriptions(sessionId: String) {
        // 清理订阅相关数据
    }
    
    private fun cleanupTempResources(sessionId: String) {
        // 清理临时资源
    }
}

CAUTION

SessionDisconnectEvent 可能会为同一个会话发布多次,组件应该对多个断开事件保持幂等性。

事件处理的最佳实践 🎯

1. 统一的事件处理器

kotlin
// 为每个事件创建单独的监听器
@Component
class SessionConnectListener : ApplicationListener<SessionConnectEvent> {
    // 处理连接事件
}

@Component  
class SessionDisconnectListener : ApplicationListener<SessionDisconnectEvent> {
    // 处理断开事件
}
kotlin
@Component
class WebSocketEventHandler {
    
    private val logger = LoggerFactory.getLogger(WebSocketEventHandler::class.java)
    
    @EventListener
    fun handleSessionConnect(event: SessionConnectEvent) {
        val accessor = StompMessageHeaderAccessor.wrap(event.message)
        logger.info("🔗 会话连接: ${accessor.sessionId}")
        // 处理连接逻辑
    }
    
    @EventListener
    fun handleSessionConnected(event: SessionConnectedEvent) {
        val accessor = StompMessageHeaderAccessor.wrap(event.message)
        logger.info("✅ 会话已建立: ${accessor.sessionId}")
        // 处理已连接逻辑
    }
    
    @EventListener
    fun handleSessionDisconnect(event: SessionDisconnectEvent) {
        val accessor = StompMessageHeaderAccessor.wrap(event.message)
        logger.info("❌ 会话断开: ${accessor.sessionId}")
        // 处理断开逻辑
    }
    
    @EventListener
    fun handleBrokerAvailability(event: BrokerAvailabilityEvent) {
        if (event.isBrokerAvailable) {
            logger.info("🟢 Broker 可用")
        } else {
            logger.warn("🔴 Broker 不可用")
        }
    }
}

2. 异步事件处理

kotlin
@Component
class AsyncWebSocketEventHandler {
    
    @Async
    @EventListener
    fun handleSessionConnectAsync(event: SessionConnectEvent) {
        // 异步处理连接事件,避免阻塞主线程
        performHeavyConnectionProcessing(event)
    }
    
    @Async
    @EventListener
    fun handleSessionDisconnectAsync(event: SessionDisconnectEvent) {
        // 异步处理断开事件,避免影响其他连接
        performHeavyCleanupProcessing(event)
    }
    
    private fun performHeavyConnectionProcessing(event: SessionConnectEvent) {
        // 执行耗时的连接处理逻辑
        Thread.sleep(1000) // 模拟耗时操作
    }
    
    private fun performHeavyCleanupProcessing(event: SessionDisconnectEvent) {
        // 执行耗时的清理逻辑
        Thread.sleep(1000) // 模拟耗时操作
    }
}

实际应用场景示例 💼

在线聊天应用的完整事件处理

完整的聊天应用事件处理器实现
kotlin
@Component
class ChatApplicationEventHandler {
    
    private val logger = LoggerFactory.getLogger(ChatApplicationEventHandler::class.java)
    private val onlineUsers = ConcurrentHashMap<String, UserSession>()
    private val chatRooms = ConcurrentHashMap<String, MutableSet<String>>()
    
    @Autowired
    private lateinit var messagingTemplate: SimpMessagingTemplate
    
    @EventListener
    fun handleConnect(event: SessionConnectEvent) {
        val accessor = StompMessageHeaderAccessor.wrap(event.message)
        val sessionId = accessor.sessionId ?: return
        val username = accessor.user?.name ?: return
        
        val userSession = UserSession(
            sessionId = sessionId,
            username = username,
            connectTime = System.currentTimeMillis()
        )
        
        onlineUsers[sessionId] = userSession
        logger.info("👤 用户上线: $username ($sessionId)")
    }
    
    @EventListener
    fun handleConnected(event: SessionConnectedEvent) {
        val accessor = StompMessageHeaderAccessor.wrap(event.message)
        val sessionId = accessor.sessionId ?: return
        val userSession = onlineUsers[sessionId] ?: return
        
        // 发送欢迎消息
        messagingTemplate.convertAndSendToUser(
            userSession.username,
            "/queue/welcome",
            WelcomeMessage(
                message = "欢迎来到聊天室!",
                onlineCount = onlineUsers.size
            )
        )
        
        // 通知其他用户
        messagingTemplate.convertAndSend(
            "/topic/user-status",
            UserStatusMessage(
                type = "online",
                username = userSession.username,
                timestamp = System.currentTimeMillis()
            )
        )
    }
    
    @EventListener
    fun handleSubscribe(event: SessionSubscribeEvent) {
        val accessor = StompMessageHeaderAccessor.wrap(event.message)
        val sessionId = accessor.sessionId ?: return
        val destination = accessor.destination ?: return
        val userSession = onlineUsers[sessionId] ?: return
        
        when {
            destination.startsWith("/topic/chat/") -> {
                val roomId = destination.substringAfterLast("/")
                joinChatRoom(userSession.username, roomId)
                sendChatHistory(userSession.username, roomId)
            }
        }
    }
    
    @EventListener
    fun handleDisconnect(event: SessionDisconnectEvent) {
        val accessor = StompMessageHeaderAccessor.wrap(event.message)
        val sessionId = accessor.sessionId ?: return
        val userSession = onlineUsers.remove(sessionId) ?: return
        
        logger.info("👋 用户下线: ${userSession.username} ($sessionId)")
        
        // 从所有聊天室中移除用户
        chatRooms.values.forEach { it.remove(userSession.username) }
        
        // 通知其他用户
        messagingTemplate.convertAndSend(
            "/topic/user-status",
            UserStatusMessage(
                type = "offline",
                username = userSession.username,
                timestamp = System.currentTimeMillis()
            )
        )
    }
    
    @EventListener
    fun handleBrokerAvailability(event: BrokerAvailabilityEvent) {
        if (event.isBrokerAvailable) {
            logger.info("🟢 消息代理可用,恢复消息服务")
            // 可以在这里重新发送缓存的消息
        } else {
            logger.warn("🔴 消息代理不可用,暂停消息服务")
            // 可以在这里缓存要发送的消息
        }
    }
    
    private fun joinChatRoom(username: String, roomId: String) {
        chatRooms.computeIfAbsent(roomId) { ConcurrentHashMap.newKeySet() }
            .add(username)
        
        messagingTemplate.convertAndSend(
            "/topic/chat/$roomId",
            ChatMessage(
                type = "system",
                content = "$username 加入了聊天室",
                timestamp = System.currentTimeMillis()
            )
        )
    }
    
    private fun sendChatHistory(username: String, roomId: String) {
        // 发送聊天历史记录
        val history = getChatHistory(roomId)
        messagingTemplate.convertAndSendToUser(
            username,
            "/queue/chat-history",
            history
        )
    }
    
    private fun getChatHistory(roomId: String): List<ChatMessage> {
        // 从数据库或缓存中获取聊天历史
        return emptyList()
    }
}

data class UserSession(
    val sessionId: String,
    val username: String,
    val connectTime: Long
)

data class WelcomeMessage(
    val message: String,
    val onlineCount: Int
)

data class UserStatusMessage(
    val type: String,
    val username: String,
    val timestamp: Long
)

data class ChatMessage(
    val type: String,
    val content: String,
    val timestamp: Long
)

注意事项与最佳实践 ⚠️

NOTE

当使用功能完整的 broker 时,STOMP "broker relay" 会在 broker 临时不可用时自动重连"系统"连接。但是,客户端连接不会自动重连。

TIP

假设启用了心跳,客户端通常会在 10 秒内注意到 broker 没有响应。客户端需要实现自己的重连逻辑。

关键要点:

  1. 幂等性处理SessionDisconnectEvent 可能多次触发,确保处理逻辑是幂等的
  2. 异常处理:事件处理器中的异常不应影响其他组件
  3. 性能考虑:避免在事件处理器中执行耗时操作,考虑使用异步处理
  4. 资源清理:及时清理断开连接的会话资源,防止内存泄漏

总结 📝

Spring WebSocket STOMP 事件机制为我们提供了强大的会话生命周期管理能力。通过合理使用这些事件,我们可以:

  • ✅ 实时跟踪用户连接状态
  • ✅ 优雅处理连接异常和恢复
  • ✅ 及时清理资源,避免内存泄漏
  • ✅ 提供更好的用户体验
  • ✅ 构建健壮的实时通信应用

掌握这些事件的使用,是构建高质量 WebSocket 应用的关键技能!🎉