Appearance
STOMP 协议详解:WebSocket 上的消息传递标准 🚀
什么是 STOMP?
STOMP(Simple Text Oriented Messaging Protocol,简单文本定向消息协议)是一个专为消息传递而设计的轻量级协议。它最初是为脚本语言(如 Ruby、Python 和 Perl)连接企业级消息代理而创建的。
NOTE
STOMP 的设计哲学是"简单而实用"——它只关注最常用的消息传递模式,避免了复杂性,让开发者能够快速上手。
为什么需要 STOMP?
在 WebSocket 出现之前,实时消息传递是一个复杂的问题:
kotlin
// 客户端需要不断轮询服务器
@RestController
class TraditionalController {
@GetMapping("/messages")
fun getMessages(): List<Message> {
// 每次都要查询数据库,效率低下
return messageService.getLatestMessages()
}
}
// 客户端 JavaScript
setInterval(() => {
fetch('/messages') // 频繁请求,浪费资源
.then(response => response.json())
.then(data => updateUI(data))
}, 1000) // 每秒轮询一次
kotlin
// 服务器主动推送消息
@Controller
class StompController {
@Autowired
private lateinit var messagingTemplate: SimpMessagingTemplate
@MessageMapping("/trade")
@SendTo("/topic/trade-updates")
fun handleTrade(tradeRequest: TradeRequest): TradeResponse {
// 处理交易请求
val result = tradeService.executeTrade(tradeRequest)
// 自动广播给所有订阅者
return TradeResponse(
action = result.action,
ticker = result.ticker,
price = result.price,
timestamp = System.currentTimeMillis()
)
}
}
STOMP 协议结构
STOMP 是一个基于帧(Frame)的协议,其结构模仿了 HTTP:
COMMAND
header1:value1
header2:value2
Body^@
TIP
^@
表示 NULL 字符(\0),用于标识帧的结束。这个设计让协议解析变得简单而可靠。
核心命令类型
命令 | 用途 | 示例场景 |
---|---|---|
CONNECT | 建立连接 | 客户端连接到服务器 |
SUBSCRIBE | 订阅消息 | 订阅股票价格更新 |
SEND | 发送消息 | 提交交易请求 |
MESSAGE | 服务器推送 | 广播价格变动 |
DISCONNECT | 断开连接 | 客户端离线 |
Spring Boot 中的 STOMP 实现
1. 配置 WebSocket + STOMP
kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
override fun configureMessageBroker(config: MessageBrokerRegistry) {
// 启用简单内存消息代理
config.enableSimpleBroker("/topic", "/queue")
// 设置应用程序目的地前缀
config.setApplicationDestinationPrefixes("/app")
}
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
// 注册 STOMP 端点
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS() // 启用 SockJS 降级支持
}
}
2. 消息处理控制器
kotlin
@Controller
class ChatController {
@Autowired
private lateinit var messagingTemplate: SimpMessagingTemplate
// 处理点对点消息
@MessageMapping("/chat.sendMessage")
@SendTo("/topic/public")
fun sendMessage(chatMessage: ChatMessage): ChatMessage {
return chatMessage.copy(
timestamp = LocalDateTime.now(),
type = MessageType.CHAT
)
}
// 处理用户加入
@MessageMapping("/chat.addUser")
@SendTo("/topic/public")
fun addUser(
chatMessage: ChatMessage,
headerAccessor: SimpMessageHeaderAccessor
): ChatMessage {
// 在 WebSocket 会话中添加用户名
headerAccessor.sessionAttributes?.put("username", chatMessage.sender)
return chatMessage.copy(
type = MessageType.JOIN,
timestamp = LocalDateTime.now()
)
}
// 定时推送系统消息
@Scheduled(fixedRate = 30000) // 每30秒执行一次
fun sendSystemMessage() {
val systemMessage = ChatMessage(
type = MessageType.SYSTEM,
content = "系统消息:当前在线用户 ${getOnlineUserCount()} 人",
sender = "System",
timestamp = LocalDateTime.now()
)
// 主动推送消息
messagingTemplate.convertAndSend("/topic/public", systemMessage)
}
}
3. 数据模型
kotlin
data class ChatMessage(
val type: MessageType,
val content: String,
val sender: String,
val timestamp: LocalDateTime = LocalDateTime.now()
)
enum class MessageType {
CHAT, // 聊天消息
JOIN, // 用户加入
LEAVE, // 用户离开
SYSTEM // 系统消息
}
data class TradeRequest(
val action: String, // BUY 或 SELL
val ticker: String, // 股票代码
val shares: Int, // 股数
val userId: String // 用户ID
)
data class TradeResponse(
val action: String,
val ticker: String,
val price: Double,
val shares: Int,
val timestamp: Long,
val status: String = "SUCCESS"
)
STOMP 消息流程图
实际应用场景
1. 实时股票交易系统
完整的股票交易示例
kotlin
@Controller
class StockTradingController {
@Autowired
private lateinit var stockService: StockService
@Autowired
private lateinit var messagingTemplate: SimpMessagingTemplate
// 处理股票交易请求
@MessageMapping("/stock.trade")
fun handleStockTrade(
tradeRequest: TradeRequest,
principal: Principal
) {
try {
// 验证用户权限
val user = userService.findByUsername(principal.name)
if (!user.canTrade()) {
throw IllegalStateException("用户无交易权限")
}
// 执行交易
val tradeResult = stockService.executeTrade(tradeRequest)
// 发送个人交易确认
messagingTemplate.convertAndSendToUser(
principal.name,
"/queue/trade-confirmations",
TradeConfirmation(
tradeId = tradeResult.id,
status = "SUCCESS",
message = "交易执行成功"
)
)
// 广播市场价格更新
messagingTemplate.convertAndSend(
"/topic/stock-prices",
StockPriceUpdate(
ticker = tradeRequest.ticker,
price = tradeResult.executionPrice,
volume = tradeResult.volume,
timestamp = System.currentTimeMillis()
)
)
} catch (e: Exception) {
// 发送错误消息给特定用户
messagingTemplate.convertAndSendToUser(
principal.name,
"/queue/errors",
ErrorMessage("交易失败: ${e.message}")
)
}
}
// 订阅特定股票价格
@SubscribeMapping("/stock-prices/{ticker}")
fun subscribeToStock(@DestinationVariable ticker: String): StockPrice {
// 立即返回当前价格
return stockService.getCurrentPrice(ticker)
}
}
2. 实时聊天系统
kotlin
@Controller
class ChatRoomController {
private val activeUsers = mutableSetOf<String>()
@MessageMapping("/chat.join")
@SendTo("/topic/chat.users")
fun joinChat(
joinMessage: JoinMessage,
headerAccessor: SimpMessageHeaderAccessor
): UserListUpdate {
val username = joinMessage.username
// 添加用户到会话
headerAccessor.sessionAttributes?.put("username", username)
activeUsers.add(username)
return UserListUpdate(
type = "USER_JOINED",
username = username,
activeUsers = activeUsers.toList()
)
}
@MessageMapping("/chat.message")
@SendTo("/topic/chat.messages")
fun sendMessage(message: ChatMessage): ChatMessage {
return message.copy(
timestamp = LocalDateTime.now(),
id = UUID.randomUUID().toString()
)
}
}
目的地(Destination)模式
IMPORTANT
STOMP 中的目的地是一个关键概念,它决定了消息的路由方式。
常见的目的地模式
模式 | 含义 | 使用场景 |
---|---|---|
/topic/... | 发布-订阅(一对多) | 广播消息、实时更新 |
/queue/... | 点对点(一对一) | 私人消息、任务分发 |
/user/... | 用户特定消息 | 个人通知、私有数据 |
实际应用示例
kotlin
@Controller
class NotificationController {
@Autowired
private lateinit var messagingTemplate: SimpMessagingTemplate
// 广播系统通知(所有用户都能收到)
fun broadcastSystemNotification(message: String) {
messagingTemplate.convertAndSend(
"/topic/system-notifications", // 发布-订阅模式
SystemNotification(
message = message,
type = "SYSTEM",
timestamp = System.currentTimeMillis()
)
)
}
// 发送个人通知(只有特定用户能收到)
fun sendPersonalNotification(username: String, message: String) {
messagingTemplate.convertAndSendToUser(
username,
"/queue/personal-notifications", // 点对点模式
PersonalNotification(
message = message,
recipient = username,
timestamp = System.currentTimeMillis()
)
)
}
}
错误处理和最佳实践
1. 连接状态监听
kotlin
@Component
class WebSocketEventListener {
private val logger = LoggerFactory.getLogger(WebSocketEventListener::class.java)
@EventListener
fun handleWebSocketConnectListener(event: SessionConnectedEvent) {
logger.info("新的 WebSocket 连接建立")
}
@EventListener
fun handleWebSocketDisconnectListener(event: SessionDisconnectEvent) {
val headerAccessor = StompHeaderAccessor.wrap(event.message)
val username = headerAccessor.sessionAttributes?.get("username") as? String
if (username != null) {
logger.info("用户 $username 断开连接")
// 通知其他用户
messagingTemplate.convertAndSend(
"/topic/chat.users",
UserListUpdate(
type = "USER_LEFT",
username = username,
activeUsers = getActiveUsers()
)
)
}
}
}
2. 消息验证和安全
kotlin
@Controller
class SecureMessageController {
@MessageMapping("/secure.message")
@PreAuthorize("hasRole('USER')")
fun handleSecureMessage(
message: SecureMessage,
principal: Principal
): ResponseEntity<String> {
// 验证消息内容
if (message.content.isBlank()) {
throw IllegalArgumentException("消息内容不能为空")
}
if (message.content.length > 500) {
throw IllegalArgumentException("消息长度不能超过500字符")
}
// 过滤敏感词汇
val filteredContent = contentFilter.filter(message.content)
val processedMessage = message.copy(
content = filteredContent,
sender = principal.name,
timestamp = LocalDateTime.now()
)
messagingTemplate.convertAndSend("/topic/secure-chat", processedMessage)
return ResponseEntity.ok("消息发送成功")
}
}
性能优化建议
TIP
以下是一些提升 STOMP 应用性能的实用建议:
1. 使用外部消息代理
kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
override fun configureMessageBroker(config: MessageBrokerRegistry) {
// 使用 RabbitMQ 作为外部消息代理
config.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("localhost")
.setRelayPort(61613)
.setClientLogin("guest")
.setClientPasscode("guest")
}
}
2. 消息批量处理
kotlin
@Service
class BatchMessageService {
private val messageBuffer = mutableListOf<StockPriceUpdate>()
@Scheduled(fixedRate = 100) // 每100ms批量发送一次
fun flushMessages() {
if (messageBuffer.isNotEmpty()) {
val batch = messageBuffer.toList()
messageBuffer.clear()
// 批量发送消息
messagingTemplate.convertAndSend(
"/topic/stock-prices-batch",
StockPriceBatch(updates = batch)
)
}
}
fun addPriceUpdate(update: StockPriceUpdate) {
messageBuffer.add(update)
}
}
总结
STOMP 协议为 WebSocket 之上的消息传递提供了一个标准化、简单易用的解决方案。它的主要优势包括:
✅ 简单性:基于文本的协议,易于理解和调试
✅ 标准化:提供了统一的消息传递模式
✅ 灵活性:支持多种消息传递模式(发布-订阅、点对点)
✅ 可扩展性:可以与外部消息代理集成
NOTE
STOMP 让实时通信变得像 HTTP 请求一样简单直观,这正是它被广泛采用的原因。无论是构建聊天应用、实时交易系统还是协作工具,STOMP 都能提供可靠的技术基础。
通过 Spring Boot 的集成支持,开发者可以快速构建出功能强大的实时应用,而无需深入了解底层的 WebSocket 实现细节。这种抽象让我们能够专注于业务逻辑,而不是协议细节。