Appearance
Spring WebSocket STOMP 监控详解 📈
概述
在现代 Web 应用中,实时通信变得越来越重要。当我们使用 Spring WebSocket 和 STOMP 协议构建实时应用时,如何监控系统的健康状态和性能表现就成为了一个关键问题。
IMPORTANT
想象一下,如果你的聊天应用突然变得很慢,或者用户连接频繁断开,但你却不知道问题出在哪里。这就是为什么我们需要监控系统!
Spring 为我们提供了强大的监控能力,让我们能够实时了解 WebSocket 连接状态、消息处理性能、以及潜在的问题。
监控的核心价值 🎯
解决的痛点
在没有监控的情况下,我们可能会遇到以下问题:
- 黑盒运行:不知道有多少用户在线,连接状态如何
- 性能瓶颈:消息处理慢,但不知道瓶颈在哪里
- 故障排查困难:连接异常断开,无法快速定位原因
- 容量规划盲目:不知道系统负载情况,无法合理扩容
监控带来的价值
- 实时可见性:清楚了解系统当前状态
- 性能优化:识别瓶颈,优化系统性能
- 故障预警:提前发现问题,避免系统崩溃
- 容量规划:基于数据做出合理的扩容决策
启用监控功能 ⚙️
基础配置
kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
override fun configureMessageBroker(config: MessageBrokerRegistry) {
// 启用简单消息代理
config.enableSimpleBroker("/topic", "/queue")
// 设置应用程序消息前缀
config.setApplicationDestinationPrefixes("/app")
}
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS()
}
}
NOTE
当你使用 @EnableWebSocketMessageBroker
注解时,Spring 会自动收集各种统计信息和计数器,这些信息对了解应用程序的内部状态非常重要。
监控统计信息访问
kotlin
@RestController
@RequestMapping("/api/websocket")
class WebSocketMonitoringController(
private val webSocketStats: WebSocketMessageBrokerStats
) {
@GetMapping("/stats")
fun getWebSocketStats(): Map<String, Any> {
return mapOf(
"webSocketSessionsInfo" to webSocketStats.webSocketSessionsInfo,
"stompSubProtocolStatsInfo" to webSocketStats.stompSubProtocolStatsInfo,
"stompBrokerRelayStatsInfo" to webSocketStats.stompBrokerRelayStatsInfo,
"clientInboundChannelStatsInfo" to webSocketStats.clientInboundChannelStatsInfo,
"clientOutboundChannelStatsInfo" to webSocketStats.clientOutboundChannelStatsInfo,
"sockJsTaskSchedulerStatsInfo" to webSocketStats.sockJsTaskSchedulerStatsInfo
)
}
}
监控指标详解 📊
1. 客户端 WebSocket 会话 (Client WebSocket Sessions)
这是监控系统中最基础也是最重要的指标,它告诉我们有多少用户正在使用我们的实时功能。
kotlin
@Component
class WebSocketSessionMonitor(
private val webSocketStats: WebSocketMessageBrokerStats
) {
@Scheduled(fixedRate = 30000) // 每30秒检查一次
fun logSessionStats() {
val sessionInfo = webSocketStats.webSocketSessionsInfo
logger.info("""
WebSocket 会话统计:
- 当前活跃会话: ${sessionInfo.split("current=")[1].split(",")[0]}
- 总会话数: ${extractTotal(sessionInfo)}
- 异常关闭数: ${extractAbnormalClosed(sessionInfo)}
""".trimIndent())
}
private fun extractTotal(info: String): String {
return try {
info.split("total=")[1].split(",")[0]
} catch (e: Exception) {
"N/A"
}
}
private fun extractAbnormalClosed(info: String): String {
return try {
info.split("abnormallyClosed=")[1].split(",")[0]
} catch (e: Exception) {
"N/A"
}
}
companion object {
private val logger = LoggerFactory.getLogger(WebSocketSessionMonitor::class.java)
}
}
关键指标说明
当前会话数 (Current)
表示当前有多少客户端会话处于活跃状态,进一步细分为:
- WebSocket 连接:原生 WebSocket 连接数
- HTTP 流式传输:使用 HTTP 流的 SockJS 会话
- HTTP 轮询:使用 HTTP 轮询的 SockJS 会话
总会话数 (Total)
表示从应用启动以来建立的所有会话总数,这个指标帮助我们了解系统的总体使用情况。
异常关闭 (Abnormally Closed)
这是一个重要的健康指标,包括:
- 连接失败:建立连接后 60 秒内未收到任何消息就关闭的会话
- 发送限制超出:因为发送超时或缓冲区限制而关闭的会话
- 传输错误:因为网络传输错误而关闭的会话
2. STOMP 帧统计
kotlin
@Component
class StompFrameMonitor(
private val webSocketStats: WebSocketMessageBrokerStats
) {
fun getStompFrameStats(): StompFrameStats {
val stompInfo = webSocketStats.stompSubProtocolStatsInfo
return StompFrameStats(
connectFrames = extractFrameCount(stompInfo, "CONNECT"),
connectedFrames = extractFrameCount(stompInfo, "CONNECTED"),
disconnectFrames = extractFrameCount(stompInfo, "DISCONNECT")
)
}
private fun extractFrameCount(info: String, frameType: String): Int {
return try {
val pattern = "$frameType=(\\d+)".toRegex()
pattern.find(info)?.groupValues?.get(1)?.toInt() ?: 0
} catch (e: Exception) {
0
}
}
}
data class StompFrameStats(
val connectFrames: Int,
val connectedFrames: Int,
val disconnectFrames: Int
) {
val connectionSuccessRate: Double
get() = if (connectFrames > 0) connectedFrames.toDouble() / connectFrames else 0.0
}
NOTE
STOMP 帧统计帮助我们了解在 STOMP 协议层面有多少客户端成功连接。注意 DISCONNECT 计数可能会比较低,因为客户端可能会异常关闭而不发送 DISCONNECT 帧。
3. 消息通道性能监控
这是性能调优的关键指标,帮助我们识别消息处理的瓶颈。
kotlin
@Component
class MessageChannelMonitor(
private val webSocketStats: WebSocketMessageBrokerStats
) {
@Scheduled(fixedRate = 60000) // 每分钟检查一次
fun checkChannelHealth() {
checkInboundChannel()
checkOutboundChannel()
}
private fun checkInboundChannel() {
val inboundInfo = webSocketStats.clientInboundChannelStatsInfo
val queueSize = extractQueueSize(inboundInfo)
if (queueSize > 100) {
logger.warn("入站通道队列积压严重: $queueSize 个任务等待处理")
logger.warn("建议检查消息处理逻辑是否存在性能问题")
}
}
private fun checkOutboundChannel() {
val outboundInfo = webSocketStats.clientOutboundChannelStatsInfo
val queueSize = extractQueueSize(outboundInfo)
if (queueSize > 100) {
logger.warn("出站通道队列积压严重: $queueSize 个任务等待处理")
logger.warn("可能存在慢客户端,建议调整发送超时或缓冲区大小")
}
}
private fun extractQueueSize(channelInfo: String): Int {
return try {
val pattern = "queueSize=(\\d+)".toRegex()
pattern.find(channelInfo)?.groupValues?.get(1)?.toInt() ?: 0
} catch (e: Exception) {
0
}
}
companion object {
private val logger = LoggerFactory.getLogger(MessageChannelMonitor::class.java)
}
}
监控数据流转过程 🔄
实际应用场景 🚀
场景1:聊天应用监控
kotlin
@Service
class ChatApplicationMonitor(
private val webSocketStats: WebSocketMessageBrokerStats
) {
@EventListener
fun handleSessionConnected(event: SessionConnectedEvent) {
val currentSessions = getCurrentSessionCount()
if (currentSessions > 1000) {
logger.info("高并发警告: 当前在线用户数 $currentSessions")
// 可以触发自动扩容逻辑
triggerAutoScaling()
}
}
@EventListener
fun handleSessionDisconnected(event: SessionDisconnectEvent) {
val disconnectReason = event.closeStatus?.reason
if (disconnectReason?.contains("timeout") == true) {
logger.warn("用户因超时断开连接: ${event.sessionId}")
}
}
private fun getCurrentSessionCount(): Int {
val sessionInfo = webSocketStats.webSocketSessionsInfo
return extractCurrentSessions(sessionInfo)
}
private fun extractCurrentSessions(info: String): Int {
return try {
info.split("current=")[1].split(",")[0].toInt()
} catch (e: Exception) {
0
}
}
private fun triggerAutoScaling() {
// 实现自动扩容逻辑
logger.info("触发自动扩容...")
}
companion object {
private val logger = LoggerFactory.getLogger(ChatApplicationMonitor::class.java)
}
}
场景2:性能优化决策
kotlin
@Configuration
class WebSocketConfigBefore : WebSocketMessageBrokerConfigurer {
override fun configureWebSocketTransport(registry: WebSocketTransportRegistration) {
registry.setMessageSizeLimit(64 * 1024) // 64KB
registry.setSendBufferSizeLimit(512 * 1024) // 512KB
registry.setSendTimeLimit(20 * 1000) // 20秒
}
}
kotlin
@Configuration
class WebSocketConfigAfter : WebSocketMessageBrokerConfigurer {
override fun configureWebSocketTransport(registry: WebSocketTransportRegistration) {
// 基于监控发现的慢客户端问题,调整配置
registry.setMessageSizeLimit(32 * 1024) // [!code highlight] // 减少到32KB
registry.setSendBufferSizeLimit(256 * 1024) // [!code highlight] // 减少到256KB
registry.setSendTimeLimit(10 * 1000) // [!code highlight] // 减少到10秒
}
override fun configureClientInboundChannel(registration: ChannelRegistration) {
// 基于入站通道积压情况,增加线程池大小
registration.taskExecutor().corePoolSize(8)
registration.taskExecutor().maxPoolSize(16)
registration.taskExecutor().queueCapacity(100)
}
}
JMX 监控集成 💻
启用 JMX 导出
kotlin
@Configuration
class JmxConfig {
@Bean
fun mbeanExporter(): MBeanExporter {
val exporter = MBeanExporter()
exporter.setDefaultDomain("com.example.websocket")
return exporter
}
@Bean
@ExportedBean(objectName = "com.example.websocket:name=WebSocketStats")
fun webSocketStatsJmx(webSocketStats: WebSocketMessageBrokerStats): WebSocketStatsJmx {
return WebSocketStatsJmx(webSocketStats)
}
}
class WebSocketStatsJmx(
private val webSocketStats: WebSocketMessageBrokerStats
) {
@ManagedAttribute(description = "当前WebSocket会话数")
fun getCurrentSessions(): Int {
val sessionInfo = webSocketStats.webSocketSessionsInfo
return extractCurrentSessions(sessionInfo)
}
@ManagedAttribute(description = "异常关闭的会话数")
fun getAbnormallyClosedSessions(): Int {
val sessionInfo = webSocketStats.webSocketSessionsInfo
return extractAbnormallyClosedSessions(sessionInfo)
}
@ManagedOperation(description = "获取完整的统计信息")
fun getFullStats(): String {
return webSocketStats.toString()
}
private fun extractCurrentSessions(info: String): Int {
return try {
info.split("current=")[1].split(",")[0].toInt()
} catch (e: Exception) {
0
}
}
private fun extractAbnormallyClosedSessions(info: String): Int {
return try {
info.split("abnormallyClosed=")[1].split(",")[0].toInt()
} catch (e: Exception) {
0
}
}
}
监控最佳实践 🏆
1. 设置合理的监控阈值
kotlin
@ConfigurationProperties(prefix = "websocket.monitoring")
@Component
data class MonitoringConfig(
var maxConcurrentSessions: Int = 1000,
var maxQueueSize: Int = 100,
var abnormalCloseRateThreshold: Double = 0.1, // 10%
var monitoringInterval: Long = 30000 // 30秒
)
2. 实现智能告警
kotlin
@Component
class WebSocketAlertManager(
private val webSocketStats: WebSocketMessageBrokerStats,
private val monitoringConfig: MonitoringConfig
) {
@Scheduled(fixedRateString = "#{@monitoringConfig.monitoringInterval}")
fun checkAndAlert() {
checkSessionHealth()
checkChannelHealth()
checkAbnormalCloseRate()
}
private fun checkSessionHealth() {
val currentSessions = getCurrentSessionCount()
if (currentSessions > monitoringConfig.maxConcurrentSessions) {
sendAlert("高并发警告", "当前会话数: $currentSessions")
}
}
private fun checkChannelHealth() {
val inboundQueueSize = getInboundQueueSize()
val outboundQueueSize = getOutboundQueueSize()
if (inboundQueueSize > monitoringConfig.maxQueueSize) {
sendAlert("入站通道积压", "队列大小: $inboundQueueSize")
}
if (outboundQueueSize > monitoringConfig.maxQueueSize) {
sendAlert("出站通道积压", "队列大小: $outboundQueueSize")
}
}
private fun checkAbnormalCloseRate() {
val totalSessions = getTotalSessionCount()
val abnormalClosed = getAbnormallyClosedCount()
if (totalSessions > 0) {
val abnormalRate = abnormalClosed.toDouble() / totalSessions
if (abnormalRate > monitoringConfig.abnormalCloseRateThreshold) {
sendAlert("异常关闭率过高", "异常关闭率: ${(abnormalRate * 100).toInt()}%")
}
}
}
private fun sendAlert(title: String, message: String) {
logger.error("WebSocket 告警 - $title: $message")
// 这里可以集成邮件、短信、钉钉等告警方式
}
// 省略其他辅助方法...
companion object {
private val logger = LoggerFactory.getLogger(WebSocketAlertManager::class.java)
}
}
总结 📝
Spring WebSocket STOMP 监控为我们提供了全面的系统可观测性:
监控的核心价值
- 预防胜于治疗:通过监控及时发现问题,避免系统故障
- 数据驱动优化:基于真实数据进行性能调优和容量规划
- 用户体验保障:确保实时通信功能的稳定性和响应性
关键监控指标记忆口诀 💡
- 会话监控:知道有多少用户在线
- 帧统计:了解协议层面的连接质量
- 通道性能:识别消息处理瓶颈
- 异常分析:快速定位问题根因
通过合理配置和使用这些监控功能,我们可以构建出健壮、高性能的实时通信应用,为用户提供优质的实时交互体验! 🎉