Skip to content

Spring WebSocket STOMP 注解控制器详解 🚀

概述

在现代 Web 应用中,实时通信已成为不可或缺的功能。想象一下聊天应用、实时通知、股票价格更新等场景,这些都需要服务器能够主动向客户端推送数据。Spring WebSocket STOMP 注解控制器就是为了解决这类问题而设计的强大工具。

NOTE

STOMP(Simple Text Oriented Messaging Protocol)是一个简单的文本导向消息协议,它提供了一个可互操作的连接格式,允许 STOMP 客户端与任意 STOMP 消息代理进行交互。

核心设计理念 💡

解决的核心痛点

在传统的 HTTP 请求-响应模式中,我们面临以下挑战:

  1. 单向通信限制:客户端只能主动请求,服务器无法主动推送
  2. 实时性差:需要客户端不断轮询才能获取最新数据
  3. 资源浪费:频繁的轮询请求消耗大量带宽和服务器资源
  4. 编程复杂性:处理异步消息和状态管理变得复杂

Spring WebSocket STOMP 注解控制器通过以下方式解决这些问题:

  • 双向通信:建立持久连接,支持双向实时通信
  • 消息路由:基于目的地的消息路由机制
  • 简化编程模型:使用熟悉的注解驱动开发方式
  • 异常处理:统一的异常处理机制

技术架构图解

核心注解详解

1. @MessageMapping - 消息路由的核心 📨

@MessageMapping 是处理客户端消息的核心注解,类似于 Spring MVC 中的 @RequestMapping

基本用法

kotlin
@Controller
class ChatController {
    
    @MessageMapping("/chat") 
    @SendTo("/topic/messages") 
    fun handleMessage(@Payload message: ChatMessage): ChatMessage {
        // 处理聊天消息
        return ChatMessage(
            content = "Echo: ${message.content}",
            sender = message.sender,
            timestamp = LocalDateTime.now()
        )
    }
    
    // 支持路径变量
    @MessageMapping("/chat/{roomId}") 
    @SendTo("/topic/room/{roomId}")
    fun handleRoomMessage(
        @DestinationVariable roomId: String, 
        @Payload message: ChatMessage
    ): ChatMessage {
        return message.copy(room = roomId)
    }
}
kotlin
data class ChatMessage(
    val content: String,
    val sender: String,
    val timestamp: LocalDateTime = LocalDateTime.now(),
    val room: String? = null
)

支持的方法参数

IMPORTANT

@MessageMapping 方法支持多种参数类型,每种都有其特定的用途和场景。

参数类型描述使用场景
Message完整的消息对象需要访问消息的所有信息时
@Payload消息载荷(自动转换)最常用,获取业务数据
@Header特定消息头需要访问特定头信息时
@DestinationVariable路径变量动态路由场景
Principal当前用户信息需要用户身份验证时
完整的参数示例
kotlin
@MessageMapping("/message/{type}")
fun handleComplexMessage(
    message: Message<String>,                    // 完整消息
    @Payload content: String,                    // 消息内容
    @Header("custom-header") customValue: String, // 自定义头
    @DestinationVariable type: String,           // 路径变量
    principal: Principal,                        // 当前用户
    headers: MessageHeaders                      // 所有消息头
): String {
    return "Processed: $content by ${principal.name} for type: $type"
}

返回值处理

kotlin
@Controller
class MessageController {
    
    // 默认发送到 /topic + 原始目的地
    @MessageMapping("/simple")
    fun simpleMessage(@Payload data: String): String {
        return "Processed: $data"
        // 自动发送到 /topic/simple
    }
    
    // 自定义目的地
    @MessageMapping("/custom")
    @SendTo("/topic/notifications") 
    fun customDestination(@Payload data: String): String {
        return "Custom: $data"
    }
    
    // 发送给特定用户
    @MessageMapping("/private")
    @SendToUser("/queue/private") 
    fun privateMessage(@Payload data: String): String {
        return "Private: $data"
    }
    
    // 异步处理
    @MessageMapping("/async")
    @SendTo("/topic/async")
    fun asyncMessage(@Payload data: String): CompletableFuture<String> { 
        return CompletableFuture.supplyAsync {
            // 模拟异步处理
            Thread.sleep(1000)
            "Async result: $data"
        }
    }
}

2. @SubscribeMapping - 订阅时的即时响应 📬

@SubscribeMapping 专门处理订阅消息,与 @MessageMapping 的主要区别是响应方式。

TIP

使用 @SubscribeMapping 的典型场景是客户端订阅时需要立即获取初始数据,比如用户界面的初始化数据。

kotlin
@Controller
class SubscriptionController {
    
    @SubscribeMapping("/greeting") 
    fun handleSubscription(principal: Principal): String {
        // 直接响应给订阅者,不经过消息代理
        return "Hello, ${principal.name}! Welcome to the chat."
    }
    
    @SubscribeMapping("/dashboard/data")
    fun getDashboardData(): DashboardData {
        // 返回初始化数据
        return DashboardData(
            userCount = userService.getActiveUserCount(),
            messages = messageService.getRecentMessages(),
            timestamp = LocalDateTime.now()
        )
    }
    
    // 如果需要通过代理广播,使用 @SendTo
    @SubscribeMapping("/status")
    @SendTo("/topic/status") 
    fun handleStatusSubscription(): String {
        return "System status: Online"
        // 这将通过代理广播给所有订阅者
    }
}

3. @MessageExceptionHandler - 优雅的异常处理 ⚠️

统一处理 WebSocket 消息处理过程中的异常。

kotlin
@Controller
class ChatController {
    
    @MessageMapping("/chat")
    @SendTo("/topic/messages")
    fun handleMessage(@Payload @Valid message: ChatMessage): ChatMessage { 
        if (message.content.isBlank()) {
            throw IllegalArgumentException("Message content cannot be empty") 
        }
        return processMessage(message)
    }
    
    @MessageExceptionHandler(IllegalArgumentException::class) 
    @SendToUser("/queue/errors")
    fun handleIllegalArgument(exception: IllegalArgumentException): ErrorMessage {
        return ErrorMessage(
            error = "Invalid input",
            message = exception.message ?: "Unknown error",
            timestamp = LocalDateTime.now()
        )
    }
    
    @MessageExceptionHandler
    @SendToUser("/queue/errors")
    fun handleGenericException(exception: Exception): ErrorMessage {
        return ErrorMessage(
            error = "Processing error",
            message = "An unexpected error occurred",
            timestamp = LocalDateTime.now()
        )
    }
}

// 全局异常处理
@ControllerAdvice
class GlobalWebSocketExceptionHandler {
    
    @MessageExceptionHandler(ValidationException::class)
    @SendToUser("/queue/errors")
    fun handleValidationException(exception: ValidationException): ErrorMessage {
        return ErrorMessage(
            error = "Validation failed",
            message = exception.message ?: "Validation error",
            timestamp = LocalDateTime.now()
        )
    }
}

实际业务场景示例 🏢

场景1:实时聊天室

kotlin
@Controller
class ChatRoomController {
    
    private val logger = LoggerFactory.getLogger(ChatRoomController::class.java)
    
    @MessageMapping("/chat.send/{roomId}")
    @SendTo("/topic/room/{roomId}")
    fun sendMessage(
        @DestinationVariable roomId: String,
        @Payload message: ChatMessage,
        principal: Principal
    ): ChatMessage {
        logger.info("User ${principal.name} sent message to room $roomId")
        
        return message.copy(
            sender = principal.name,
            timestamp = LocalDateTime.now(),
            room = roomId
        )
    }
    
    @MessageMapping("/chat.join/{roomId}")
    @SendTo("/topic/room/{roomId}")
    fun joinRoom(
        @DestinationVariable roomId: String,
        principal: Principal
    ): SystemMessage {
        return SystemMessage(
            type = "USER_JOINED",
            content = "${principal.name} joined the room",
            timestamp = LocalDateTime.now()
        )
    }
    
    @SubscribeMapping("/chat.history/{roomId}")
    fun getChatHistory(
        @DestinationVariable roomId: String
    ): List<ChatMessage> {
        // 返回聊天历史记录
        return chatService.getRecentMessages(roomId, 50)
    }
}

场景2:实时通知系统

kotlin
@Controller
class NotificationController {
    
    @Autowired
    private lateinit var messagingTemplate: SimpMessagingTemplate
    
    @MessageMapping("/notification.subscribe")
    fun subscribeToNotifications(principal: Principal): String {
        // 用户订阅通知
        notificationService.subscribe(principal.name)
        return "Subscribed to notifications"
    }
    
    // 系统内部调用,发送通知
    @EventListener
    fun handleNotificationEvent(event: NotificationEvent) {
        val notification = Notification(
            title = event.title,
            message = event.message,
            type = event.type,
            timestamp = LocalDateTime.now()
        )
        
        // 发送给特定用户
        messagingTemplate.convertAndSendToUser(
            event.userId,
            "/queue/notifications",
            notification
        )
    }
    
    @MessageExceptionHandler
    @SendToUser("/queue/errors")
    fun handleNotificationError(exception: Exception): ErrorMessage {
        return ErrorMessage(
            error = "Notification error",
            message = exception.message ?: "Failed to process notification",
            timestamp = LocalDateTime.now()
        )
    }
}

配置与最佳实践 ⚙️

WebSocket 配置

kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
    
    override fun configureMessageBroker(config: MessageBrokerRegistry) {
        // 启用简单代理
        config.enableSimpleBroker("/topic", "/queue") 
        // 设置应用程序目的地前缀
        config.setApplicationDestinationPrefixes("/app") 
        // 设置用户目的地前缀
        config.setUserDestinationPrefix("/user") 
    }
    
    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        // 注册 STOMP 端点
        registry.addEndpoint("/ws") 
            .setAllowedOriginPatterns("*")
            .withSockJS() // 启用 SockJS 后备选项
    }
}

安全配置

kotlin
@Configuration
@EnableWebSocketSecurity
class WebSocketSecurityConfig {
    
    @Bean
    fun authorizationManager(): AuthorizationManager<Message<*>> {
        val messages = AuthorizationManagerMessageMatcherRegistry()
        
        messages
            .simpDestMatchers("/app/public/**").permitAll() 
            .simpDestMatchers("/app/admin/**").hasRole("ADMIN") 
            .anyMessage().authenticated() 
            
        return messages.build()
    }
}

性能优化建议 🚀

WARNING

在生产环境中,需要特别注意以下性能和安全问题:

1. 消息大小控制

kotlin
@MessageMapping("/upload")
fun handleFileUpload(@Payload data: ByteArray): String {
    if (data.size > 1024 * 1024) { // 1MB 限制
        throw IllegalArgumentException("File too large")
    }
    return "File uploaded successfully"
}

2. 连接数管理

kotlin
@Component
class ConnectionManager {
    private val activeConnections = AtomicInteger(0)
    private val maxConnections = 1000
    
    @EventListener
    fun handleSessionConnected(event: SessionConnectedEvent) {
        if (activeConnections.incrementAndGet() > maxConnections) { 
            // 拒绝新连接
            throw IllegalStateException("Too many connections")
        }
    }
    
    @EventListener
    fun handleSessionDisconnected(event: SessionDisconnectEvent) {
        activeConnections.decrementAndGet()
    }
}

3. 异步处理

kotlin
@Controller
class AsyncController {
    
    @Async
    @MessageMapping("/heavy-task")
    @SendTo("/topic/results")
    fun handleHeavyTask(@Payload data: String): CompletableFuture<String> {
        return CompletableFuture.supplyAsync {
            // 执行耗时操作
            heavyProcessingService.process(data)
        }
    }
}

总结 📝

Spring WebSocket STOMP 注解控制器为我们提供了一套完整的实时通信解决方案:

简化开发:使用熟悉的注解驱动模式 ✅ 灵活路由:支持复杂的消息路由规则
异常处理:统一的异常处理机制 ✅ 性能优化:支持异步处理和连接管理 ✅ 安全保障:完整的安全配置支持

TIP

在实际项目中,建议结合具体业务场景选择合适的注解和配置策略。对于高并发场景,还需要考虑使用外部消息代理(如 RabbitMQ、Apache Kafka)来提升性能和可靠性。

通过掌握这些核心概念和最佳实践,你就能够构建出高效、可靠的实时通信应用了!🎉