Skip to content

Spring WebSocket STOMP 监控详解 📈

概述

在现代 Web 应用中,实时通信变得越来越重要。当我们使用 Spring WebSocket 和 STOMP 协议构建实时应用时,如何监控系统的健康状态和性能表现就成为了一个关键问题。

IMPORTANT

想象一下,如果你的聊天应用突然变得很慢,或者用户连接频繁断开,但你却不知道问题出在哪里。这就是为什么我们需要监控系统!

Spring 为我们提供了强大的监控能力,让我们能够实时了解 WebSocket 连接状态、消息处理性能、以及潜在的问题。

监控的核心价值 🎯

解决的痛点

在没有监控的情况下,我们可能会遇到以下问题:

  • 黑盒运行:不知道有多少用户在线,连接状态如何
  • 性能瓶颈:消息处理慢,但不知道瓶颈在哪里
  • 故障排查困难:连接异常断开,无法快速定位原因
  • 容量规划盲目:不知道系统负载情况,无法合理扩容

监控带来的价值

  • 实时可见性:清楚了解系统当前状态
  • 性能优化:识别瓶颈,优化系统性能
  • 故障预警:提前发现问题,避免系统崩溃
  • 容量规划:基于数据做出合理的扩容决策

启用监控功能 ⚙️

基础配置

kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
    
    override fun configureMessageBroker(config: MessageBrokerRegistry) {
        // 启用简单消息代理
        config.enableSimpleBroker("/topic", "/queue")
        // 设置应用程序消息前缀
        config.setApplicationDestinationPrefixes("/app")
    }
    
    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        registry.addEndpoint("/ws")
            .setAllowedOriginPatterns("*")
            .withSockJS()
    }
}

NOTE

当你使用 @EnableWebSocketMessageBroker 注解时,Spring 会自动收集各种统计信息和计数器,这些信息对了解应用程序的内部状态非常重要。

监控统计信息访问

kotlin
@RestController
@RequestMapping("/api/websocket")
class WebSocketMonitoringController(
    private val webSocketStats: WebSocketMessageBrokerStats
) {
    
    @GetMapping("/stats")
    fun getWebSocketStats(): Map<String, Any> {
        return mapOf(
            "webSocketSessionsInfo" to webSocketStats.webSocketSessionsInfo,
            "stompSubProtocolStatsInfo" to webSocketStats.stompSubProtocolStatsInfo,
            "stompBrokerRelayStatsInfo" to webSocketStats.stompBrokerRelayStatsInfo,
            "clientInboundChannelStatsInfo" to webSocketStats.clientInboundChannelStatsInfo,
            "clientOutboundChannelStatsInfo" to webSocketStats.clientOutboundChannelStatsInfo,
            "sockJsTaskSchedulerStatsInfo" to webSocketStats.sockJsTaskSchedulerStatsInfo
        )
    }
}

监控指标详解 📊

1. 客户端 WebSocket 会话 (Client WebSocket Sessions)

这是监控系统中最基础也是最重要的指标,它告诉我们有多少用户正在使用我们的实时功能。

kotlin
@Component
class WebSocketSessionMonitor(
    private val webSocketStats: WebSocketMessageBrokerStats
) {
    
    @Scheduled(fixedRate = 30000) // 每30秒检查一次
    fun logSessionStats() {
        val sessionInfo = webSocketStats.webSocketSessionsInfo
        
        logger.info("""
            WebSocket 会话统计:
            - 当前活跃会话: ${sessionInfo.split("current=")[1].split(",")[0]}
            - 总会话数: ${extractTotal(sessionInfo)}
            - 异常关闭数: ${extractAbnormalClosed(sessionInfo)}
        """.trimIndent())
    }
    
    private fun extractTotal(info: String): String {
        return try {
            info.split("total=")[1].split(",")[0]
        } catch (e: Exception) {
            "N/A"
        }
    }
    
    private fun extractAbnormalClosed(info: String): String {
        return try {
            info.split("abnormallyClosed=")[1].split(",")[0]
        } catch (e: Exception) {
            "N/A"
        }
    }
    
    companion object {
        private val logger = LoggerFactory.getLogger(WebSocketSessionMonitor::class.java)
    }
}

关键指标说明

当前会话数 (Current)

表示当前有多少客户端会话处于活跃状态,进一步细分为:

  • WebSocket 连接:原生 WebSocket 连接数
  • HTTP 流式传输:使用 HTTP 流的 SockJS 会话
  • HTTP 轮询:使用 HTTP 轮询的 SockJS 会话

总会话数 (Total)

表示从应用启动以来建立的所有会话总数,这个指标帮助我们了解系统的总体使用情况。

异常关闭 (Abnormally Closed)

这是一个重要的健康指标,包括:

  • 连接失败:建立连接后 60 秒内未收到任何消息就关闭的会话
  • 发送限制超出:因为发送超时或缓冲区限制而关闭的会话
  • 传输错误:因为网络传输错误而关闭的会话

2. STOMP 帧统计

kotlin
@Component
class StompFrameMonitor(
    private val webSocketStats: WebSocketMessageBrokerStats
) {
    
    fun getStompFrameStats(): StompFrameStats {
        val stompInfo = webSocketStats.stompSubProtocolStatsInfo
        
        return StompFrameStats(
            connectFrames = extractFrameCount(stompInfo, "CONNECT"),
            connectedFrames = extractFrameCount(stompInfo, "CONNECTED"),
            disconnectFrames = extractFrameCount(stompInfo, "DISCONNECT")
        )
    }
    
    private fun extractFrameCount(info: String, frameType: String): Int {
        return try {
            val pattern = "$frameType=(\\d+)".toRegex()
            pattern.find(info)?.groupValues?.get(1)?.toInt() ?: 0
        } catch (e: Exception) {
            0
        }
    }
}

data class StompFrameStats(
    val connectFrames: Int,
    val connectedFrames: Int,
    val disconnectFrames: Int
) {
    val connectionSuccessRate: Double
        get() = if (connectFrames > 0) connectedFrames.toDouble() / connectFrames else 0.0
}

NOTE

STOMP 帧统计帮助我们了解在 STOMP 协议层面有多少客户端成功连接。注意 DISCONNECT 计数可能会比较低,因为客户端可能会异常关闭而不发送 DISCONNECT 帧。

3. 消息通道性能监控

这是性能调优的关键指标,帮助我们识别消息处理的瓶颈。

kotlin
@Component
class MessageChannelMonitor(
    private val webSocketStats: WebSocketMessageBrokerStats
) {
    
    @Scheduled(fixedRate = 60000) // 每分钟检查一次
    fun checkChannelHealth() {
        checkInboundChannel()
        checkOutboundChannel()
    }
    
    private fun checkInboundChannel() {
        val inboundInfo = webSocketStats.clientInboundChannelStatsInfo
        val queueSize = extractQueueSize(inboundInfo)
        
        if (queueSize > 100) { 
            logger.warn("入站通道队列积压严重: $queueSize 个任务等待处理")
            logger.warn("建议检查消息处理逻辑是否存在性能问题")
        }
    }
    
    private fun checkOutboundChannel() {
        val outboundInfo = webSocketStats.clientOutboundChannelStatsInfo
        val queueSize = extractQueueSize(outboundInfo)
        
        if (queueSize > 100) { 
            logger.warn("出站通道队列积压严重: $queueSize 个任务等待处理")
            logger.warn("可能存在慢客户端,建议调整发送超时或缓冲区大小")
        }
    }
    
    private fun extractQueueSize(channelInfo: String): Int {
        return try {
            val pattern = "queueSize=(\\d+)".toRegex()
            pattern.find(channelInfo)?.groupValues?.get(1)?.toInt() ?: 0
        } catch (e: Exception) {
            0
        }
    }
    
    companion object {
        private val logger = LoggerFactory.getLogger(MessageChannelMonitor::class.java)
    }
}

监控数据流转过程 🔄

实际应用场景 🚀

场景1:聊天应用监控

kotlin
@Service
class ChatApplicationMonitor(
    private val webSocketStats: WebSocketMessageBrokerStats
) {
    
    @EventListener
    fun handleSessionConnected(event: SessionConnectedEvent) {
        val currentSessions = getCurrentSessionCount()
        
        if (currentSessions > 1000) { 
            logger.info("高并发警告: 当前在线用户数 $currentSessions")
            // 可以触发自动扩容逻辑
            triggerAutoScaling()
        }
    }
    
    @EventListener
    fun handleSessionDisconnected(event: SessionDisconnectEvent) {
        val disconnectReason = event.closeStatus?.reason
        if (disconnectReason?.contains("timeout") == true) {
            logger.warn("用户因超时断开连接: ${event.sessionId}")
        }
    }
    
    private fun getCurrentSessionCount(): Int {
        val sessionInfo = webSocketStats.webSocketSessionsInfo
        return extractCurrentSessions(sessionInfo)
    }
    
    private fun extractCurrentSessions(info: String): Int {
        return try {
            info.split("current=")[1].split(",")[0].toInt()
        } catch (e: Exception) {
            0
        }
    }
    
    private fun triggerAutoScaling() {
        // 实现自动扩容逻辑
        logger.info("触发自动扩容...")
    }
    
    companion object {
        private val logger = LoggerFactory.getLogger(ChatApplicationMonitor::class.java)
    }
}

场景2:性能优化决策

kotlin
@Configuration
class WebSocketConfigBefore : WebSocketMessageBrokerConfigurer {
    
    override fun configureWebSocketTransport(registry: WebSocketTransportRegistration) {
        registry.setMessageSizeLimit(64 * 1024) // 64KB
        registry.setSendBufferSizeLimit(512 * 1024) // 512KB
        registry.setSendTimeLimit(20 * 1000) // 20秒
    }
}
kotlin
@Configuration
class WebSocketConfigAfter : WebSocketMessageBrokerConfigurer {
    
    override fun configureWebSocketTransport(registry: WebSocketTransportRegistration) {
        // 基于监控发现的慢客户端问题,调整配置
        registry.setMessageSizeLimit(32 * 1024) // [!code highlight] // 减少到32KB
        registry.setSendBufferSizeLimit(256 * 1024) // [!code highlight] // 减少到256KB
        registry.setSendTimeLimit(10 * 1000) // [!code highlight] // 减少到10秒
    }
    
    override fun configureClientInboundChannel(registration: ChannelRegistration) {
        // 基于入站通道积压情况,增加线程池大小
        registration.taskExecutor().corePoolSize(8) 
        registration.taskExecutor().maxPoolSize(16) 
        registration.taskExecutor().queueCapacity(100) 
    }
}

JMX 监控集成 💻

启用 JMX 导出

kotlin
@Configuration
class JmxConfig {
    
    @Bean
    fun mbeanExporter(): MBeanExporter {
        val exporter = MBeanExporter()
        exporter.setDefaultDomain("com.example.websocket")
        return exporter
    }
    
    @Bean
    @ExportedBean(objectName = "com.example.websocket:name=WebSocketStats")
    fun webSocketStatsJmx(webSocketStats: WebSocketMessageBrokerStats): WebSocketStatsJmx {
        return WebSocketStatsJmx(webSocketStats)
    }
}

class WebSocketStatsJmx(
    private val webSocketStats: WebSocketMessageBrokerStats
) {
    
    @ManagedAttribute(description = "当前WebSocket会话数")
    fun getCurrentSessions(): Int {
        val sessionInfo = webSocketStats.webSocketSessionsInfo
        return extractCurrentSessions(sessionInfo)
    }
    
    @ManagedAttribute(description = "异常关闭的会话数")
    fun getAbnormallyClosedSessions(): Int {
        val sessionInfo = webSocketStats.webSocketSessionsInfo
        return extractAbnormallyClosedSessions(sessionInfo)
    }
    
    @ManagedOperation(description = "获取完整的统计信息")
    fun getFullStats(): String {
        return webSocketStats.toString()
    }
    
    private fun extractCurrentSessions(info: String): Int {
        return try {
            info.split("current=")[1].split(",")[0].toInt()
        } catch (e: Exception) {
            0
        }
    }
    
    private fun extractAbnormallyClosedSessions(info: String): Int {
        return try {
            info.split("abnormallyClosed=")[1].split(",")[0].toInt()
        } catch (e: Exception) {
            0
        }
    }
}

监控最佳实践 🏆

1. 设置合理的监控阈值

kotlin
@ConfigurationProperties(prefix = "websocket.monitoring")
@Component
data class MonitoringConfig(
    var maxConcurrentSessions: Int = 1000,
    var maxQueueSize: Int = 100,
    var abnormalCloseRateThreshold: Double = 0.1, // 10%
    var monitoringInterval: Long = 30000 // 30秒
)

2. 实现智能告警

kotlin
@Component
class WebSocketAlertManager(
    private val webSocketStats: WebSocketMessageBrokerStats,
    private val monitoringConfig: MonitoringConfig
) {
    
    @Scheduled(fixedRateString = "#{@monitoringConfig.monitoringInterval}")
    fun checkAndAlert() {
        checkSessionHealth()
        checkChannelHealth()
        checkAbnormalCloseRate()
    }
    
    private fun checkSessionHealth() {
        val currentSessions = getCurrentSessionCount()
        if (currentSessions > monitoringConfig.maxConcurrentSessions) {
            sendAlert("高并发警告", "当前会话数: $currentSessions")
        }
    }
    
    private fun checkChannelHealth() {
        val inboundQueueSize = getInboundQueueSize()
        val outboundQueueSize = getOutboundQueueSize()
        
        if (inboundQueueSize > monitoringConfig.maxQueueSize) {
            sendAlert("入站通道积压", "队列大小: $inboundQueueSize")
        }
        
        if (outboundQueueSize > monitoringConfig.maxQueueSize) {
            sendAlert("出站通道积压", "队列大小: $outboundQueueSize")
        }
    }
    
    private fun checkAbnormalCloseRate() {
        val totalSessions = getTotalSessionCount()
        val abnormalClosed = getAbnormallyClosedCount()
        
        if (totalSessions > 0) {
            val abnormalRate = abnormalClosed.toDouble() / totalSessions
            if (abnormalRate > monitoringConfig.abnormalCloseRateThreshold) {
                sendAlert("异常关闭率过高", "异常关闭率: ${(abnormalRate * 100).toInt()}%")
            }
        }
    }
    
    private fun sendAlert(title: String, message: String) {
        logger.error("WebSocket 告警 - $title: $message")
        // 这里可以集成邮件、短信、钉钉等告警方式
    }
    
    // 省略其他辅助方法...
    
    companion object {
        private val logger = LoggerFactory.getLogger(WebSocketAlertManager::class.java)
    }
}

总结 📝

Spring WebSocket STOMP 监控为我们提供了全面的系统可观测性:

监控的核心价值

  • 预防胜于治疗:通过监控及时发现问题,避免系统故障
  • 数据驱动优化:基于真实数据进行性能调优和容量规划
  • 用户体验保障:确保实时通信功能的稳定性和响应性

关键监控指标记忆口诀 💡

  • 会话监控:知道有多少用户在线
  • 帧统计:了解协议层面的连接质量
  • 通道性能:识别消息处理瓶颈
  • 异常分析:快速定位问题根因

通过合理配置和使用这些监控功能,我们可以构建出健壮、高性能的实时通信应用,为用户提供优质的实时交互体验! 🎉