Skip to content

STOMP 协议详解:WebSocket 上的消息传递标准 🚀

什么是 STOMP?

STOMP(Simple Text Oriented Messaging Protocol,简单文本定向消息协议)是一个专为消息传递而设计的轻量级协议。它最初是为脚本语言(如 Ruby、Python 和 Perl)连接企业级消息代理而创建的。

NOTE

STOMP 的设计哲学是"简单而实用"——它只关注最常用的消息传递模式,避免了复杂性,让开发者能够快速上手。

为什么需要 STOMP?

在 WebSocket 出现之前,实时消息传递是一个复杂的问题:

kotlin
// 客户端需要不断轮询服务器
@RestController
class TraditionalController {
    
    @GetMapping("/messages")
    fun getMessages(): List<Message> {
        // 每次都要查询数据库,效率低下
        return messageService.getLatestMessages()
    }
}

// 客户端 JavaScript
setInterval(() => {
    fetch('/messages') // 频繁请求,浪费资源
        .then(response => response.json())
        .then(data => updateUI(data))
}, 1000) // 每秒轮询一次
kotlin
// 服务器主动推送消息
@Controller
class StompController {
    
    @Autowired
    private lateinit var messagingTemplate: SimpMessagingTemplate
    
    @MessageMapping("/trade") 
    @SendTo("/topic/trade-updates") 
    fun handleTrade(tradeRequest: TradeRequest): TradeResponse {
        // 处理交易请求
        val result = tradeService.executeTrade(tradeRequest)
        
        // 自动广播给所有订阅者
        return TradeResponse(
            action = result.action,
            ticker = result.ticker,
            price = result.price,
            timestamp = System.currentTimeMillis()
        )
    }
}

STOMP 协议结构

STOMP 是一个基于帧(Frame)的协议,其结构模仿了 HTTP:

COMMAND
header1:value1
header2:value2

Body^@

TIP

^@ 表示 NULL 字符(\0),用于标识帧的结束。这个设计让协议解析变得简单而可靠。

核心命令类型

命令用途示例场景
CONNECT建立连接客户端连接到服务器
SUBSCRIBE订阅消息订阅股票价格更新
SEND发送消息提交交易请求
MESSAGE服务器推送广播价格变动
DISCONNECT断开连接客户端离线

Spring Boot 中的 STOMP 实现

1. 配置 WebSocket + STOMP

kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
    
    override fun configureMessageBroker(config: MessageBrokerRegistry) {
        // 启用简单内存消息代理
        config.enableSimpleBroker("/topic", "/queue")
        
        // 设置应用程序目的地前缀
        config.setApplicationDestinationPrefixes("/app")
    }
    
    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        // 注册 STOMP 端点
        registry.addEndpoint("/ws")
            .setAllowedOriginPatterns("*")
            .withSockJS() // 启用 SockJS 降级支持
    }
}

2. 消息处理控制器

kotlin
@Controller
class ChatController {
    
    @Autowired
    private lateinit var messagingTemplate: SimpMessagingTemplate
    
    // 处理点对点消息
    @MessageMapping("/chat.sendMessage") 
    @SendTo("/topic/public") 
    fun sendMessage(chatMessage: ChatMessage): ChatMessage {
        return chatMessage.copy(
            timestamp = LocalDateTime.now(),
            type = MessageType.CHAT
        )
    }
    
    // 处理用户加入
    @MessageMapping("/chat.addUser") 
    @SendTo("/topic/public") 
    fun addUser(
        chatMessage: ChatMessage,
        headerAccessor: SimpMessageHeaderAccessor
    ): ChatMessage {
        // 在 WebSocket 会话中添加用户名
        headerAccessor.sessionAttributes?.put("username", chatMessage.sender)
        
        return chatMessage.copy(
            type = MessageType.JOIN,
            timestamp = LocalDateTime.now()
        )
    }
    
    // 定时推送系统消息
    @Scheduled(fixedRate = 30000) // 每30秒执行一次
    fun sendSystemMessage() {
        val systemMessage = ChatMessage(
            type = MessageType.SYSTEM,
            content = "系统消息:当前在线用户 ${getOnlineUserCount()} 人",
            sender = "System",
            timestamp = LocalDateTime.now()
        )
        
        // 主动推送消息
        messagingTemplate.convertAndSend("/topic/public", systemMessage)
    }
}

3. 数据模型

kotlin
data class ChatMessage(
    val type: MessageType,
    val content: String,
    val sender: String,
    val timestamp: LocalDateTime = LocalDateTime.now()
)

enum class MessageType {
    CHAT,    // 聊天消息
    JOIN,    // 用户加入
    LEAVE,   // 用户离开
    SYSTEM   // 系统消息
}

data class TradeRequest(
    val action: String,      // BUY 或 SELL
    val ticker: String,      // 股票代码
    val shares: Int,         // 股数
    val userId: String       // 用户ID
)

data class TradeResponse(
    val action: String,
    val ticker: String,
    val price: Double,
    val shares: Int,
    val timestamp: Long,
    val status: String = "SUCCESS"
)

STOMP 消息流程图

实际应用场景

1. 实时股票交易系统

完整的股票交易示例
kotlin
@Controller
class StockTradingController {
    
    @Autowired
    private lateinit var stockService: StockService
    
    @Autowired
    private lateinit var messagingTemplate: SimpMessagingTemplate
    
    // 处理股票交易请求
    @MessageMapping("/stock.trade") 
    fun handleStockTrade(
        tradeRequest: TradeRequest,
        principal: Principal
    ) {
        try {
            // 验证用户权限
            val user = userService.findByUsername(principal.name)
            if (!user.canTrade()) {
                throw IllegalStateException("用户无交易权限")
            }
            
            // 执行交易
            val tradeResult = stockService.executeTrade(tradeRequest)
            
            // 发送个人交易确认
            messagingTemplate.convertAndSendToUser(
                principal.name,
                "/queue/trade-confirmations",
                TradeConfirmation(
                    tradeId = tradeResult.id,
                    status = "SUCCESS",
                    message = "交易执行成功"
                )
            )
            
            // 广播市场价格更新
            messagingTemplate.convertAndSend(
                "/topic/stock-prices",
                StockPriceUpdate(
                    ticker = tradeRequest.ticker,
                    price = tradeResult.executionPrice,
                    volume = tradeResult.volume,
                    timestamp = System.currentTimeMillis()
                )
            )
            
        } catch (e: Exception) {
            // 发送错误消息给特定用户
            messagingTemplate.convertAndSendToUser(
                principal.name,
                "/queue/errors",
                ErrorMessage("交易失败: ${e.message}")
            )
        }
    }
    
    // 订阅特定股票价格
    @SubscribeMapping("/stock-prices/{ticker}") 
    fun subscribeToStock(@DestinationVariable ticker: String): StockPrice {
        // 立即返回当前价格
        return stockService.getCurrentPrice(ticker)
    }
}

2. 实时聊天系统

kotlin
@Controller
class ChatRoomController {
    
    private val activeUsers = mutableSetOf<String>()
    
    @MessageMapping("/chat.join") 
    @SendTo("/topic/chat.users") 
    fun joinChat(
        joinMessage: JoinMessage,
        headerAccessor: SimpMessageHeaderAccessor
    ): UserListUpdate {
        val username = joinMessage.username
        
        // 添加用户到会话
        headerAccessor.sessionAttributes?.put("username", username)
        activeUsers.add(username)
        
        return UserListUpdate(
            type = "USER_JOINED",
            username = username,
            activeUsers = activeUsers.toList()
        )
    }
    
    @MessageMapping("/chat.message") 
    @SendTo("/topic/chat.messages") 
    fun sendMessage(message: ChatMessage): ChatMessage {
        return message.copy(
            timestamp = LocalDateTime.now(),
            id = UUID.randomUUID().toString()
        )
    }
}

目的地(Destination)模式

IMPORTANT

STOMP 中的目的地是一个关键概念,它决定了消息的路由方式。

常见的目的地模式

模式含义使用场景
/topic/...发布-订阅(一对多)广播消息、实时更新
/queue/...点对点(一对一)私人消息、任务分发
/user/...用户特定消息个人通知、私有数据

实际应用示例

kotlin
@Controller
class NotificationController {
    
    @Autowired
    private lateinit var messagingTemplate: SimpMessagingTemplate
    
    // 广播系统通知(所有用户都能收到)
    fun broadcastSystemNotification(message: String) {
        messagingTemplate.convertAndSend( 
            "/topic/system-notifications", // 发布-订阅模式
            SystemNotification(
                message = message,
                type = "SYSTEM",
                timestamp = System.currentTimeMillis()
            )
        )
    }
    
    // 发送个人通知(只有特定用户能收到)
    fun sendPersonalNotification(username: String, message: String) {
        messagingTemplate.convertAndSendToUser( 
            username,
            "/queue/personal-notifications", // 点对点模式
            PersonalNotification(
                message = message,
                recipient = username,
                timestamp = System.currentTimeMillis()
            )
        )
    }
}

错误处理和最佳实践

1. 连接状态监听

kotlin
@Component
class WebSocketEventListener {
    
    private val logger = LoggerFactory.getLogger(WebSocketEventListener::class.java)
    
    @EventListener
    fun handleWebSocketConnectListener(event: SessionConnectedEvent) {
        logger.info("新的 WebSocket 连接建立")
    }
    
    @EventListener
    fun handleWebSocketDisconnectListener(event: SessionDisconnectEvent) {
        val headerAccessor = StompHeaderAccessor.wrap(event.message)
        val username = headerAccessor.sessionAttributes?.get("username") as? String
        
        if (username != null) {
            logger.info("用户 $username 断开连接")
            
            // 通知其他用户
            messagingTemplate.convertAndSend(
                "/topic/chat.users",
                UserListUpdate(
                    type = "USER_LEFT",
                    username = username,
                    activeUsers = getActiveUsers()
                )
            )
        }
    }
}

2. 消息验证和安全

kotlin
@Controller
class SecureMessageController {
    
    @MessageMapping("/secure.message")
    @PreAuthorize("hasRole('USER')") 
    fun handleSecureMessage(
        message: SecureMessage,
        principal: Principal
    ): ResponseEntity<String> {
        
        // 验证消息内容
        if (message.content.isBlank()) {
            throw IllegalArgumentException("消息内容不能为空")
        }
        
        if (message.content.length > 500) {
            throw IllegalArgumentException("消息长度不能超过500字符")
        }
        
        // 过滤敏感词汇
        val filteredContent = contentFilter.filter(message.content)
        
        val processedMessage = message.copy(
            content = filteredContent,
            sender = principal.name,
            timestamp = LocalDateTime.now()
        )
        
        messagingTemplate.convertAndSend("/topic/secure-chat", processedMessage)
        
        return ResponseEntity.ok("消息发送成功")
    }
}

性能优化建议

TIP

以下是一些提升 STOMP 应用性能的实用建议:

1. 使用外部消息代理

kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
    
    override fun configureMessageBroker(config: MessageBrokerRegistry) {
        // 使用 RabbitMQ 作为外部消息代理
        config.enableStompBrokerRelay("/topic", "/queue")
            .setRelayHost("localhost")
            .setRelayPort(61613)
            .setClientLogin("guest")
            .setClientPasscode("guest")
    }
}

2. 消息批量处理

kotlin
@Service
class BatchMessageService {
    
    private val messageBuffer = mutableListOf<StockPriceUpdate>()
    
    @Scheduled(fixedRate = 100) // 每100ms批量发送一次
    fun flushMessages() {
        if (messageBuffer.isNotEmpty()) {
            val batch = messageBuffer.toList()
            messageBuffer.clear()
            
            // 批量发送消息
            messagingTemplate.convertAndSend(
                "/topic/stock-prices-batch",
                StockPriceBatch(updates = batch)
            )
        }
    }
    
    fun addPriceUpdate(update: StockPriceUpdate) {
        messageBuffer.add(update)
    }
}

总结

STOMP 协议为 WebSocket 之上的消息传递提供了一个标准化、简单易用的解决方案。它的主要优势包括:

简单性:基于文本的协议,易于理解和调试
标准化:提供了统一的消息传递模式
灵活性:支持多种消息传递模式(发布-订阅、点对点)
可扩展性:可以与外部消息代理集成

NOTE

STOMP 让实时通信变得像 HTTP 请求一样简单直观,这正是它被广泛采用的原因。无论是构建聊天应用、实时交易系统还是协作工具,STOMP 都能提供可靠的技术基础。

通过 Spring Boot 的集成支持,开发者可以快速构建出功能强大的实时应用,而无需深入了解底层的 WebSocket 实现细节。这种抽象让我们能够专注于业务逻辑,而不是协议细节。