Skip to content

Spring STOMP 入门指南:让实时通信变得简单 🚀

什么是 STOMP?为什么我们需要它?

想象一下,你正在开发一个聊天应用或者股票交易系统,用户需要实时接收消息和数据更新。传统的 HTTP 请求-响应模式就像是"问一句答一句"的对话,但实时通信更像是"广播电台"——服务器需要主动向客户端推送消息。

NOTE

STOMP(Simple Text Oriented Messaging Protocol)是一个简单的面向文本的消息传递协议。它就像是 WebSocket 之上的一层"翻译器",让复杂的实时通信变得像发送普通消息一样简单。

没有 STOMP 会遇到什么问题?

kotlin
// 原生 WebSocket 处理,代码复杂
@Component
class WebSocketHandler : TextWebSocketHandler() {
    override fun afterConnectionEstablished(session: WebSocketSession) {
        // 手动管理连接
        sessions.add(session) 
    }
    
    override fun handleTextMessage(session: WebSocketSession, message: TextMessage) {
        // 手动解析消息格式
        val payload = message.payload
        // 需要自己实现路由逻辑
        when {
            payload.startsWith("CHAT:") -> handleChat(payload)
            payload.startsWith("TRADE:") -> handleTrade(payload)
            // ... 更多复杂的解析逻辑
        }
    }
}
kotlin
@Controller
class ChatController {
    
    @MessageMapping("/chat.send") 
    @SendTo("/topic/public") 
    fun sendMessage(chatMessage: ChatMessage): ChatMessage {
        return chatMessage // 就这么简单!
    }
    
    @MessageMapping("/trade.update") 
    @SendTo("/topic/stocks") 
    fun updateStock(stockUpdate: StockUpdate): StockUpdate {
        return stockUpdate
    }
}

STOMP 的核心设计哲学

STOMP 的设计哲学可以用三个词概括:简单标准化可路由

启用 STOMP:一步到位的配置

核心配置类

让我们看看如何用最少的代码启用 STOMP 功能:

kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfiguration : WebSocketMessageBrokerConfigurer {

    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        // 注册 STOMP 端点 - 客户端连接的入口
        registry.addEndpoint("/portfolio") 
            .setAllowedOrigins("*") // 生产环境请配置具体域名
    }

    override fun configureMessageBroker(config: MessageBrokerRegistry) {
        // 配置应用程序目的地前缀 - 路由到 @MessageMapping
        config.setApplicationDestinationPrefixes("/app") 
        
        // 启用简单消息代理 - 处理订阅和广播
        config.enableSimpleBroker("/topic", "/queue") 
    }
}

TIP

这个配置类就像是一个"交通指挥中心",它告诉 Spring:

  • 在哪里接受连接(/portfolio
  • 如何路由消息(/app 前缀路由到控制器)
  • 在哪里进行消息广播(/topic/queue

路径前缀的含义

前缀用途示例说明
/app应用程序处理/app/chat.send路由到 @MessageMapping 方法
/topic发布-订阅模式/topic/public一对多广播(如聊天室)
/queue点对点模式/queue/user123一对一消息(如私聊)

实战案例:构建股票交易系统

让我们通过一个完整的股票交易系统来理解 STOMP 的实际应用:

1. 消息模型定义

kotlin
// 股票更新消息
data class StockUpdate(
    val symbol: String,        // 股票代码
    val price: BigDecimal,     // 当前价格
    val change: BigDecimal,    // 价格变化
    val timestamp: LocalDateTime = LocalDateTime.now()
)

// 交易订单
data class TradeOrder(
    val userId: String,
    val symbol: String,
    val quantity: Int,
    val orderType: OrderType,
    val price: BigDecimal?
)

enum class OrderType { BUY, SELL }

2. STOMP 控制器

kotlin
@Controller
class StockController {
    
    private val logger = LoggerFactory.getLogger(StockController::class.java)
    
    @MessageMapping("/stock.subscribe") 
    @SendTo("/topic/stocks") 
    fun subscribeToStock(symbol: String): StockUpdate {
        logger.info("用户订阅股票: $symbol")
        // 返回当前股票信息
        return getCurrentStockPrice(symbol)
    }
    
    @MessageMapping("/trade.order") 
    @SendToUser("/queue/orders") 
    fun placeOrder(order: TradeOrder, principal: Principal): TradeResult {
        logger.info("用户 ${principal.name} 下单: $order")
        
        // 处理交易逻辑
        val result = processTradeOrder(order)
        
        // 如果交易成功,广播价格更新
        if (result.success) {
            broadcastPriceUpdate(order.symbol)
        }
        
        return result
    }
    
    private fun getCurrentStockPrice(symbol: String): StockUpdate {
        // 模拟获取股票价格
        return StockUpdate(
            symbol = symbol,
            price = BigDecimal("150.25"),
            change = BigDecimal("2.15")
        )
    }
    
    private fun processTradeOrder(order: TradeOrder): TradeResult {
        // 模拟交易处理逻辑
        return TradeResult(
            orderId = UUID.randomUUID().toString(),
            success = true,
            message = "交易成功"
        )
    }
    
    @Autowired
    private lateinit var messagingTemplate: SimpMessagingTemplate
    
    private fun broadcastPriceUpdate(symbol: String) {
        val update = getCurrentStockPrice(symbol)
        messagingTemplate.convertAndSend("/topic/stocks", update) 
    }
}

data class TradeResult(
    val orderId: String,
    val success: Boolean,
    val message: String
)

3. 定时价格推送服务

kotlin
@Service
class StockPriceService {
    
    @Autowired
    private lateinit var messagingTemplate: SimpMessagingTemplate
    
    private val random = Random()
    private val stockSymbols = listOf("AAPL", "GOOGL", "MSFT", "TSLA")
    
    @Scheduled(fixedRate = 5000) // 每5秒推送一次
    fun pushRandomStockUpdate() {
        val symbol = stockSymbols.random()
        val priceChange = (random.nextDouble() - 0.5) * 10 // -5 到 +5 的随机变化
        
        val update = StockUpdate(
            symbol = symbol,
            price = BigDecimal("150.00").add(BigDecimal(priceChange.toString())),
            change = BigDecimal(priceChange.toString())
        )
        
        // 广播到所有订阅者
        messagingTemplate.convertAndSend("/topic/stocks", update) 
        
        logger.info("推送股票更新: $update")
    }
    
    companion object {
        private val logger = LoggerFactory.getLogger(StockPriceService::class.java)
    }
}

客户端连接示例

JavaScript 客户端

javascript
// 创建 STOMP 客户端
const stompClient = new StompJs.Client({
    brokerURL: 'ws://localhost:8080/portfolio', 
    onConnect: (frame) => {
        console.log('连接成功: ' + frame);
        
        // 订阅股票更新
        stompClient.subscribe('/topic/stocks', (message) => { 
            const stockUpdate = JSON.parse(message.body);
            updateStockDisplay(stockUpdate);
        });
        
        // 订阅个人交易结果
        stompClient.subscribe('/user/queue/orders', (message) => { 
            const tradeResult = JSON.parse(message.body);
            showTradeResult(tradeResult);
        });
    },
    onStompError: (frame) => {
        console.error('STOMP 错误: ' + frame.headers['message']);
    }
});

// 连接到服务器
stompClient.activate();

// 发送交易订单
function placeOrder(symbol, quantity, orderType) {
    const order = {
        userId: 'user123',
        symbol: symbol,
        quantity: quantity,
        orderType: orderType,
        price: null
    };
    
    stompClient.publish({ 
        destination: '/app/trade.order',
        body: JSON.stringify(order)
    });
}

消息流转全景图

高级特性与最佳实践

1. SockJS 降级支持

WARNING

在某些网络环境下,WebSocket 可能被阻止。SockJS 提供了优雅的降级方案。

kotlin
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
    registry.addEndpoint("/portfolio")
        .setAllowedOrigins("*")
        .withSockJS() // [!code highlight] // 启用 SockJS 降级支持
}

2. 用户认证与授权

kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketSecurityConfig : WebSocketMessageBrokerConfigurer {
    
    override fun configureClientInboundChannel(registration: ChannelRegistration) {
        registration.interceptors(object : ChannelInterceptor {
            override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>? {
                val accessor = StompHeaderAccessor.wrap(message)
                
                if (StompCommand.CONNECT == accessor.command) {
                    // 验证用户身份
                    val user = authenticateUser(accessor) 
                    accessor.user = user
                }
                
                return message
            }
        })
    }
    
    private fun authenticateUser(accessor: StompHeaderAccessor): Principal {
        // 实现用户认证逻辑
        val token = accessor.getFirstNativeHeader("Authorization")
        return validateTokenAndGetUser(token)
    }
}

3. 错误处理

kotlin
@Controller
class StockController {
    
    @MessageExceptionHandler
    @SendToUser("/queue/errors")
    fun handleException(exception: Exception): ErrorMessage {
        logger.error("处理消息时发生错误", exception)
        return ErrorMessage(
            error = "PROCESSING_ERROR",
            message = exception.message ?: "未知错误"
        )
    }
}

data class ErrorMessage(
    val error: String,
    val message: String,
    val timestamp: LocalDateTime = LocalDateTime.now()
)

性能优化建议

IMPORTANT

在生产环境中,以下优化措施至关重要:

1. 使用外部消息代理

kotlin
override fun configureMessageBroker(config: MessageBrokerRegistry) {
    // 使用 RabbitMQ 或 ActiveMQ 替代简单代理
    config.enableStompBrokerRelay("/topic", "/queue") 
        .setRelayHost("rabbitmq-server")
        .setRelayPort(61613)
        .setClientLogin("guest")
        .setClientPasscode("guest")
}

2. 连接池配置

kotlin
@Configuration
class WebSocketConfig {
    
    @Bean
    fun taskScheduler(): TaskScheduler {
        val scheduler = ThreadPoolTaskScheduler()
        scheduler.poolSize = 10
        scheduler.threadNamePrefix = "websocket-"
        scheduler.initialize()
        return scheduler
    }
}

总结

STOMP 让实时通信变得简单而优雅。它的核心价值在于:

标准化协议:无需自定义消息格式
简单的路由机制:通过注解轻松处理消息
灵活的订阅模式:支持广播和点对点通信
完善的错误处理:内置异常处理机制

通过 Spring 的 STOMP 支持,我们可以用最少的代码构建出功能强大的实时应用,无论是聊天系统、实时监控,还是协作工具,STOMP 都能提供稳定可靠的解决方案。

TIP

开始你的第一个 STOMP 应用吧!从简单的聊天室开始,逐步探索更复杂的实时交互场景。记住,好的架构始于简单的开始。 🎉