Skip to content

Spring WebSocket STOMP 用户目标详解 🚀

什么是用户目标?为什么需要它? 🤔

在传统的消息传递系统中,如果我们想给特定用户发送消息,通常会遇到以下问题:

kotlin
// 问题1:如何确保消息只发给特定用户?
@MessageMapping("/broadcast")
fun broadcastMessage(message: String) {
    // 这会发给所有订阅者,无法精确控制
    messagingTemplate.convertAndSend("/topic/updates", message)
}

// 问题2:用户可能有多个会话,如何管理?
// 问题3:如何避免用户之间的消息冲突?
kotlin
// 解决方案:使用 /user/ 前缀
@MessageMapping("/trade")
@SendToUser("/queue/position-updates") 
fun executeTrade(trade: Trade, principal: Principal): TradeResult {
    // Spring 自动将消息发送给当前用户
    return processTradeResult(trade)
}

IMPORTANT

用户目标(User Destinations) 是 Spring STOMP 提供的一种机制,它允许应用程序向特定用户发送消息,而不需要知道用户的具体会话信息。通过 /user/ 前缀,Spring 会自动处理用户会话的映射和消息路由。

核心工作原理 ⚙️

让我们通过时序图来理解用户目标的工作机制:

实战应用场景 💻

1. 基础用户消息推送

kotlin
@Controller
class PortfolioController {

    @MessageMapping("/trade")
    @SendToUser("/queue/position-updates") 
    fun executeTrade(trade: Trade, principal: Principal): TradeResult {
        // 处理交易逻辑
        val result = tradeService.execute(trade, principal.name)
        
        // 返回结果会自动发送给当前用户的 /user/queue/position-updates
        return TradeResult(
            tradeId = result.id,
            status = "COMPLETED",
            message = "交易执行成功"
        )
    }
}
kotlin
data class Trade(
    val symbol: String,
    val quantity: Int,
    val price: Double,
    val type: TradeType
)

data class TradeResult(
    val tradeId: String,
    val status: String,
    val message: String,
    val timestamp: LocalDateTime = LocalDateTime.now()
)

enum class TradeType { BUY, SELL }

2. 异常处理与错误消息

kotlin
@Controller
class TradingController {

    @MessageMapping("/execute-order")
    fun handleOrder(order: Order): OrderResult {
        // 可能抛出业务异常
        if (order.amount <= 0) {
            throw InvalidOrderException("订单金额必须大于0")
        }
        return processOrder(order)
    }

    @MessageExceptionHandler
    @SendToUser(destinations = ["/queue/errors"], broadcast = false) 
    fun handleException(exception: InvalidOrderException): ErrorMessage {
        return ErrorMessage(
            code = "INVALID_ORDER",
            message = exception.message ?: "订单处理失败",
            timestamp = LocalDateTime.now()
        )
    }
}

TIP

注意 broadcast = false 参数:这确保错误消息只发送给触发异常的那个会话,而不是用户的所有会话。

3. 服务层主动推送消息

kotlin
@Service
class NotificationService(
    @Qualifier("brokerMessagingTemplate") 
    private val messagingTemplate: SimpMessagingTemplate
) {

    fun sendPersonalNotification(username: String, notification: Notification) {
        // 直接向特定用户发送消息
        messagingTemplate.convertAndSendToUser( 
            username,
            "/queue/notifications",
            notification
        )
    }

    fun sendTradeConfirmation(trade: CompletedTrade) {
        val confirmation = TradeConfirmation(
            tradeId = trade.id,
            symbol = trade.symbol,
            executedPrice = trade.executedPrice,
            message = "交易已成功执行"
        )
        
        messagingTemplate.convertAndSendToUser(
            trade.userId,
            "/queue/trade-confirmations", 
            confirmation
        )
    }
}

配置要点 🔧

WebSocket 配置

kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {

    override fun configureMessageBroker(config: MessageBrokerRegistry) {
        // 配置代理前缀
        config.enableSimpleBroker("/topic", "/queue") 
        
        // 配置应用程序目标前缀
        config.setApplicationDestinationPrefixes("/app") 
        
        // 配置用户目标前缀(可选,默认就是 "/user")
        config.setUserDestinationPrefix("/user") 
    }

    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        registry.addEndpoint("/ws")
            .setAllowedOriginPatterns("*")
            .withSockJS()
    }
}

WARNING

确保正确配置代理和应用程序目标前缀,否则以 /user 为前缀的消息可能会被消息代理错误处理,而不是由 UserDestinationMessageHandler 处理。

多会话场景处理 👥

默认行为:广播到所有会话

kotlin
@Controller
class MultiSessionController {

    @MessageMapping("/system-announcement")
    @SendToUser("/queue/announcements") 
    fun sendAnnouncement(announcement: String, principal: Principal): SystemMessage {
        // 默认情况下,如果用户有多个会话(多个浏览器标签页)
        // 消息会发送到用户的所有会话
        return SystemMessage(
            type = "ANNOUNCEMENT",
            content = announcement,
            timestamp = LocalDateTime.now()
        )
    }
}

精确控制:只发送给当前会话

kotlin
@Controller
class SessionSpecificController {

    @MessageMapping("/personal-action")
    @SendToUser(destinations = ["/queue/action-results"], broadcast = false) 
    fun handlePersonalAction(action: UserAction, principal: Principal): ActionResult {
        // broadcast = false 确保消息只发送给发起请求的那个会话
        return ActionResult(
            actionId = action.id,
            result = "操作完成",
            sessionSpecific = true
        )
    }
}

客户端订阅示例 💻

javascript
// 客户端 JavaScript 代码
const stompClient = new StompJs.Client({
    brokerURL: 'ws://localhost:8080/ws'
});

stompClient.onConnect = function (frame) {
    console.log('Connected: ' + frame);
    
    // 订阅用户特定的队列
    stompClient.subscribe('/user/queue/position-updates', function (message) { 
        const tradeResult = JSON.parse(message.body);
        console.log('收到交易结果:', tradeResult);
        updateUI(tradeResult);
    });
    
    // 订阅错误消息
    stompClient.subscribe('/user/queue/errors', function (message) { 
        const error = JSON.parse(message.body);
        console.log('收到错误消息:', error);
        showErrorNotification(error);
    });
};

stompClient.activate();

高级特性:集群环境支持 ☁️

在多服务器环境中,用户可能连接到不同的服务器。Spring 提供了广播机制来处理这种情况:

kotlin
@Configuration
@EnableWebSocketMessageBroker
class ClusterWebSocketConfig : WebSocketMessageBrokerConfigurer {

    override fun configureMessageBroker(config: MessageBrokerRegistry) {
        config.enableSimpleBroker("/topic", "/queue")
        config.setApplicationDestinationPrefixes("/app")
        
        // 配置用户目标广播,用于集群环境
        config.setUserDestinationBroadcast("/topic/unresolved-user-destination") 
    }
}

最佳实践建议 ⭐

1. 目标命名规范

kotlin
// 推荐的目标命名方式
@SendToUser("/queue/trade-updates")        // ✅ 清晰的业务含义
@SendToUser("/queue/notifications")        // ✅ 通用通知
@SendToUser("/queue/errors")              // ✅ 错误消息

// 避免的命名方式
@SendToUser("/queue/data")                // ❌ 含义不明确
@SendToUser("/topic/user-stuff")          // ❌ 使用了错误的前缀

2. 异常处理策略

kotlin
@Controller
class RobustController {

    @MessageMapping("/risky-operation")
    fun handleRiskyOperation(request: RiskyRequest, principal: Principal): OperationResult {
        try {
            return performOperation(request)
        } catch (exception: BusinessException) {
            // 让异常处理器处理
            throw exception
        }
    }

    @MessageExceptionHandler(BusinessException::class)
    @SendToUser("/queue/errors", broadcast = false) 
    fun handleBusinessException(exception: BusinessException): ErrorResponse {
        return ErrorResponse(
            errorCode = exception.errorCode,
            message = exception.message,
            suggestions = exception.suggestions
        )
    }
}

3. 消息模板的正确使用

kotlin
@Service
class MessagingService(
    @Qualifier("brokerMessagingTemplate")
    private val messagingTemplate: SimpMessagingTemplate
) {

    fun sendUserSpecificUpdate(userId: String, update: Any) {
        // 使用 convertAndSendToUser 而不是 convertAndSend
        messagingTemplate.convertAndSendToUser( 
            userId,
            "/queue/updates",
            update
        )
    }

    fun sendWithHeaders(userId: String, payload: Any, headers: Map<String, Any>) {
        messagingTemplate.convertAndSendToUser(
            userId,
            "/queue/data",
            payload,
            headers 
        )
    }
}

总结 📝

Spring WebSocket STOMP 的用户目标机制为我们提供了一个优雅的解决方案来处理用户特定的消息传递:

核心价值

  • 简化开发:无需手动管理用户会话映射
  • 自动路由:Spring 自动处理消息到用户会话的路由
  • 多会话支持:灵活控制消息是广播到所有会话还是特定会话
  • 集群友好:支持多服务器环境下的消息传递

通过合理使用用户目标,我们可以构建出既简洁又强大的实时消息系统,为用户提供个性化的实时体验。记住关键是理解 /user/ 前缀的魔法以及 UserDestinationMessageHandler 在幕后所做的转换工作! ✨