Appearance
Spring STOMP 入门指南:让实时通信变得简单 🚀
什么是 STOMP?为什么我们需要它?
想象一下,你正在开发一个聊天应用或者股票交易系统,用户需要实时接收消息和数据更新。传统的 HTTP 请求-响应模式就像是"问一句答一句"的对话,但实时通信更像是"广播电台"——服务器需要主动向客户端推送消息。
NOTE
STOMP(Simple Text Oriented Messaging Protocol)是一个简单的面向文本的消息传递协议。它就像是 WebSocket 之上的一层"翻译器",让复杂的实时通信变得像发送普通消息一样简单。
没有 STOMP 会遇到什么问题?
kotlin
// 原生 WebSocket 处理,代码复杂
@Component
class WebSocketHandler : TextWebSocketHandler() {
override fun afterConnectionEstablished(session: WebSocketSession) {
// 手动管理连接
sessions.add(session)
}
override fun handleTextMessage(session: WebSocketSession, message: TextMessage) {
// 手动解析消息格式
val payload = message.payload
// 需要自己实现路由逻辑
when {
payload.startsWith("CHAT:") -> handleChat(payload)
payload.startsWith("TRADE:") -> handleTrade(payload)
// ... 更多复杂的解析逻辑
}
}
}
kotlin
@Controller
class ChatController {
@MessageMapping("/chat.send")
@SendTo("/topic/public")
fun sendMessage(chatMessage: ChatMessage): ChatMessage {
return chatMessage // 就这么简单!
}
@MessageMapping("/trade.update")
@SendTo("/topic/stocks")
fun updateStock(stockUpdate: StockUpdate): StockUpdate {
return stockUpdate
}
}
STOMP 的核心设计哲学
STOMP 的设计哲学可以用三个词概括:简单、标准化、可路由。
启用 STOMP:一步到位的配置
核心配置类
让我们看看如何用最少的代码启用 STOMP 功能:
kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfiguration : WebSocketMessageBrokerConfigurer {
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
// 注册 STOMP 端点 - 客户端连接的入口
registry.addEndpoint("/portfolio")
.setAllowedOrigins("*") // 生产环境请配置具体域名
}
override fun configureMessageBroker(config: MessageBrokerRegistry) {
// 配置应用程序目的地前缀 - 路由到 @MessageMapping
config.setApplicationDestinationPrefixes("/app")
// 启用简单消息代理 - 处理订阅和广播
config.enableSimpleBroker("/topic", "/queue")
}
}
TIP
这个配置类就像是一个"交通指挥中心",它告诉 Spring:
- 在哪里接受连接(
/portfolio
) - 如何路由消息(
/app
前缀路由到控制器) - 在哪里进行消息广播(
/topic
和/queue
)
路径前缀的含义
前缀 | 用途 | 示例 | 说明 |
---|---|---|---|
/app | 应用程序处理 | /app/chat.send | 路由到 @MessageMapping 方法 |
/topic | 发布-订阅模式 | /topic/public | 一对多广播(如聊天室) |
/queue | 点对点模式 | /queue/user123 | 一对一消息(如私聊) |
实战案例:构建股票交易系统
让我们通过一个完整的股票交易系统来理解 STOMP 的实际应用:
1. 消息模型定义
kotlin
// 股票更新消息
data class StockUpdate(
val symbol: String, // 股票代码
val price: BigDecimal, // 当前价格
val change: BigDecimal, // 价格变化
val timestamp: LocalDateTime = LocalDateTime.now()
)
// 交易订单
data class TradeOrder(
val userId: String,
val symbol: String,
val quantity: Int,
val orderType: OrderType,
val price: BigDecimal?
)
enum class OrderType { BUY, SELL }
2. STOMP 控制器
kotlin
@Controller
class StockController {
private val logger = LoggerFactory.getLogger(StockController::class.java)
@MessageMapping("/stock.subscribe")
@SendTo("/topic/stocks")
fun subscribeToStock(symbol: String): StockUpdate {
logger.info("用户订阅股票: $symbol")
// 返回当前股票信息
return getCurrentStockPrice(symbol)
}
@MessageMapping("/trade.order")
@SendToUser("/queue/orders")
fun placeOrder(order: TradeOrder, principal: Principal): TradeResult {
logger.info("用户 ${principal.name} 下单: $order")
// 处理交易逻辑
val result = processTradeOrder(order)
// 如果交易成功,广播价格更新
if (result.success) {
broadcastPriceUpdate(order.symbol)
}
return result
}
private fun getCurrentStockPrice(symbol: String): StockUpdate {
// 模拟获取股票价格
return StockUpdate(
symbol = symbol,
price = BigDecimal("150.25"),
change = BigDecimal("2.15")
)
}
private fun processTradeOrder(order: TradeOrder): TradeResult {
// 模拟交易处理逻辑
return TradeResult(
orderId = UUID.randomUUID().toString(),
success = true,
message = "交易成功"
)
}
@Autowired
private lateinit var messagingTemplate: SimpMessagingTemplate
private fun broadcastPriceUpdate(symbol: String) {
val update = getCurrentStockPrice(symbol)
messagingTemplate.convertAndSend("/topic/stocks", update)
}
}
data class TradeResult(
val orderId: String,
val success: Boolean,
val message: String
)
3. 定时价格推送服务
kotlin
@Service
class StockPriceService {
@Autowired
private lateinit var messagingTemplate: SimpMessagingTemplate
private val random = Random()
private val stockSymbols = listOf("AAPL", "GOOGL", "MSFT", "TSLA")
@Scheduled(fixedRate = 5000) // 每5秒推送一次
fun pushRandomStockUpdate() {
val symbol = stockSymbols.random()
val priceChange = (random.nextDouble() - 0.5) * 10 // -5 到 +5 的随机变化
val update = StockUpdate(
symbol = symbol,
price = BigDecimal("150.00").add(BigDecimal(priceChange.toString())),
change = BigDecimal(priceChange.toString())
)
// 广播到所有订阅者
messagingTemplate.convertAndSend("/topic/stocks", update)
logger.info("推送股票更新: $update")
}
companion object {
private val logger = LoggerFactory.getLogger(StockPriceService::class.java)
}
}
客户端连接示例
JavaScript 客户端
javascript
// 创建 STOMP 客户端
const stompClient = new StompJs.Client({
brokerURL: 'ws://localhost:8080/portfolio',
onConnect: (frame) => {
console.log('连接成功: ' + frame);
// 订阅股票更新
stompClient.subscribe('/topic/stocks', (message) => {
const stockUpdate = JSON.parse(message.body);
updateStockDisplay(stockUpdate);
});
// 订阅个人交易结果
stompClient.subscribe('/user/queue/orders', (message) => {
const tradeResult = JSON.parse(message.body);
showTradeResult(tradeResult);
});
},
onStompError: (frame) => {
console.error('STOMP 错误: ' + frame.headers['message']);
}
});
// 连接到服务器
stompClient.activate();
// 发送交易订单
function placeOrder(symbol, quantity, orderType) {
const order = {
userId: 'user123',
symbol: symbol,
quantity: quantity,
orderType: orderType,
price: null
};
stompClient.publish({
destination: '/app/trade.order',
body: JSON.stringify(order)
});
}
消息流转全景图
高级特性与最佳实践
1. SockJS 降级支持
WARNING
在某些网络环境下,WebSocket 可能被阻止。SockJS 提供了优雅的降级方案。
kotlin
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
registry.addEndpoint("/portfolio")
.setAllowedOrigins("*")
.withSockJS() // [!code highlight] // 启用 SockJS 降级支持
}
2. 用户认证与授权
kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketSecurityConfig : WebSocketMessageBrokerConfigurer {
override fun configureClientInboundChannel(registration: ChannelRegistration) {
registration.interceptors(object : ChannelInterceptor {
override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>? {
val accessor = StompHeaderAccessor.wrap(message)
if (StompCommand.CONNECT == accessor.command) {
// 验证用户身份
val user = authenticateUser(accessor)
accessor.user = user
}
return message
}
})
}
private fun authenticateUser(accessor: StompHeaderAccessor): Principal {
// 实现用户认证逻辑
val token = accessor.getFirstNativeHeader("Authorization")
return validateTokenAndGetUser(token)
}
}
3. 错误处理
kotlin
@Controller
class StockController {
@MessageExceptionHandler
@SendToUser("/queue/errors")
fun handleException(exception: Exception): ErrorMessage {
logger.error("处理消息时发生错误", exception)
return ErrorMessage(
error = "PROCESSING_ERROR",
message = exception.message ?: "未知错误"
)
}
}
data class ErrorMessage(
val error: String,
val message: String,
val timestamp: LocalDateTime = LocalDateTime.now()
)
性能优化建议
IMPORTANT
在生产环境中,以下优化措施至关重要:
1. 使用外部消息代理
kotlin
override fun configureMessageBroker(config: MessageBrokerRegistry) {
// 使用 RabbitMQ 或 ActiveMQ 替代简单代理
config.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("rabbitmq-server")
.setRelayPort(61613)
.setClientLogin("guest")
.setClientPasscode("guest")
}
2. 连接池配置
kotlin
@Configuration
class WebSocketConfig {
@Bean
fun taskScheduler(): TaskScheduler {
val scheduler = ThreadPoolTaskScheduler()
scheduler.poolSize = 10
scheduler.threadNamePrefix = "websocket-"
scheduler.initialize()
return scheduler
}
}
总结
STOMP 让实时通信变得简单而优雅。它的核心价值在于:
✅ 标准化协议:无需自定义消息格式
✅ 简单的路由机制:通过注解轻松处理消息
✅ 灵活的订阅模式:支持广播和点对点通信
✅ 完善的错误处理:内置异常处理机制
通过 Spring 的 STOMP 支持,我们可以用最少的代码构建出功能强大的实时应用,无论是聊天系统、实时监控,还是协作工具,STOMP 都能提供稳定可靠的解决方案。
TIP
开始你的第一个 STOMP 应用吧!从简单的聊天室开始,逐步探索更复杂的实时交互场景。记住,好的架构始于简单的开始。 🎉