Appearance
Spring WebSocket STOMP 注解控制器详解 🚀
概述
在现代 Web 应用中,实时通信已成为不可或缺的功能。想象一下聊天应用、实时通知、股票价格更新等场景,这些都需要服务器能够主动向客户端推送数据。Spring WebSocket STOMP 注解控制器就是为了解决这类问题而设计的强大工具。
NOTE
STOMP(Simple Text Oriented Messaging Protocol)是一个简单的文本导向消息协议,它提供了一个可互操作的连接格式,允许 STOMP 客户端与任意 STOMP 消息代理进行交互。
核心设计理念 💡
解决的核心痛点
在传统的 HTTP 请求-响应模式中,我们面临以下挑战:
- 单向通信限制:客户端只能主动请求,服务器无法主动推送
- 实时性差:需要客户端不断轮询才能获取最新数据
- 资源浪费:频繁的轮询请求消耗大量带宽和服务器资源
- 编程复杂性:处理异步消息和状态管理变得复杂
Spring WebSocket STOMP 注解控制器通过以下方式解决这些问题:
- 双向通信:建立持久连接,支持双向实时通信
- 消息路由:基于目的地的消息路由机制
- 简化编程模型:使用熟悉的注解驱动开发方式
- 异常处理:统一的异常处理机制
技术架构图解
核心注解详解
1. @MessageMapping - 消息路由的核心 📨
@MessageMapping
是处理客户端消息的核心注解,类似于 Spring MVC 中的 @RequestMapping
。
基本用法
kotlin
@Controller
class ChatController {
@MessageMapping("/chat")
@SendTo("/topic/messages")
fun handleMessage(@Payload message: ChatMessage): ChatMessage {
// 处理聊天消息
return ChatMessage(
content = "Echo: ${message.content}",
sender = message.sender,
timestamp = LocalDateTime.now()
)
}
// 支持路径变量
@MessageMapping("/chat/{roomId}")
@SendTo("/topic/room/{roomId}")
fun handleRoomMessage(
@DestinationVariable roomId: String,
@Payload message: ChatMessage
): ChatMessage {
return message.copy(room = roomId)
}
}
kotlin
data class ChatMessage(
val content: String,
val sender: String,
val timestamp: LocalDateTime = LocalDateTime.now(),
val room: String? = null
)
支持的方法参数
IMPORTANT
@MessageMapping
方法支持多种参数类型,每种都有其特定的用途和场景。
参数类型 | 描述 | 使用场景 |
---|---|---|
Message | 完整的消息对象 | 需要访问消息的所有信息时 |
@Payload | 消息载荷(自动转换) | 最常用,获取业务数据 |
@Header | 特定消息头 | 需要访问特定头信息时 |
@DestinationVariable | 路径变量 | 动态路由场景 |
Principal | 当前用户信息 | 需要用户身份验证时 |
完整的参数示例
kotlin
@MessageMapping("/message/{type}")
fun handleComplexMessage(
message: Message<String>, // 完整消息
@Payload content: String, // 消息内容
@Header("custom-header") customValue: String, // 自定义头
@DestinationVariable type: String, // 路径变量
principal: Principal, // 当前用户
headers: MessageHeaders // 所有消息头
): String {
return "Processed: $content by ${principal.name} for type: $type"
}
返回值处理
kotlin
@Controller
class MessageController {
// 默认发送到 /topic + 原始目的地
@MessageMapping("/simple")
fun simpleMessage(@Payload data: String): String {
return "Processed: $data"
// 自动发送到 /topic/simple
}
// 自定义目的地
@MessageMapping("/custom")
@SendTo("/topic/notifications")
fun customDestination(@Payload data: String): String {
return "Custom: $data"
}
// 发送给特定用户
@MessageMapping("/private")
@SendToUser("/queue/private")
fun privateMessage(@Payload data: String): String {
return "Private: $data"
}
// 异步处理
@MessageMapping("/async")
@SendTo("/topic/async")
fun asyncMessage(@Payload data: String): CompletableFuture<String> {
return CompletableFuture.supplyAsync {
// 模拟异步处理
Thread.sleep(1000)
"Async result: $data"
}
}
}
2. @SubscribeMapping - 订阅时的即时响应 📬
@SubscribeMapping
专门处理订阅消息,与 @MessageMapping
的主要区别是响应方式。
TIP
使用 @SubscribeMapping
的典型场景是客户端订阅时需要立即获取初始数据,比如用户界面的初始化数据。
kotlin
@Controller
class SubscriptionController {
@SubscribeMapping("/greeting")
fun handleSubscription(principal: Principal): String {
// 直接响应给订阅者,不经过消息代理
return "Hello, ${principal.name}! Welcome to the chat."
}
@SubscribeMapping("/dashboard/data")
fun getDashboardData(): DashboardData {
// 返回初始化数据
return DashboardData(
userCount = userService.getActiveUserCount(),
messages = messageService.getRecentMessages(),
timestamp = LocalDateTime.now()
)
}
// 如果需要通过代理广播,使用 @SendTo
@SubscribeMapping("/status")
@SendTo("/topic/status")
fun handleStatusSubscription(): String {
return "System status: Online"
// 这将通过代理广播给所有订阅者
}
}
3. @MessageExceptionHandler - 优雅的异常处理 ⚠️
统一处理 WebSocket 消息处理过程中的异常。
kotlin
@Controller
class ChatController {
@MessageMapping("/chat")
@SendTo("/topic/messages")
fun handleMessage(@Payload @Valid message: ChatMessage): ChatMessage {
if (message.content.isBlank()) {
throw IllegalArgumentException("Message content cannot be empty")
}
return processMessage(message)
}
@MessageExceptionHandler(IllegalArgumentException::class)
@SendToUser("/queue/errors")
fun handleIllegalArgument(exception: IllegalArgumentException): ErrorMessage {
return ErrorMessage(
error = "Invalid input",
message = exception.message ?: "Unknown error",
timestamp = LocalDateTime.now()
)
}
@MessageExceptionHandler
@SendToUser("/queue/errors")
fun handleGenericException(exception: Exception): ErrorMessage {
return ErrorMessage(
error = "Processing error",
message = "An unexpected error occurred",
timestamp = LocalDateTime.now()
)
}
}
// 全局异常处理
@ControllerAdvice
class GlobalWebSocketExceptionHandler {
@MessageExceptionHandler(ValidationException::class)
@SendToUser("/queue/errors")
fun handleValidationException(exception: ValidationException): ErrorMessage {
return ErrorMessage(
error = "Validation failed",
message = exception.message ?: "Validation error",
timestamp = LocalDateTime.now()
)
}
}
实际业务场景示例 🏢
场景1:实时聊天室
kotlin
@Controller
class ChatRoomController {
private val logger = LoggerFactory.getLogger(ChatRoomController::class.java)
@MessageMapping("/chat.send/{roomId}")
@SendTo("/topic/room/{roomId}")
fun sendMessage(
@DestinationVariable roomId: String,
@Payload message: ChatMessage,
principal: Principal
): ChatMessage {
logger.info("User ${principal.name} sent message to room $roomId")
return message.copy(
sender = principal.name,
timestamp = LocalDateTime.now(),
room = roomId
)
}
@MessageMapping("/chat.join/{roomId}")
@SendTo("/topic/room/{roomId}")
fun joinRoom(
@DestinationVariable roomId: String,
principal: Principal
): SystemMessage {
return SystemMessage(
type = "USER_JOINED",
content = "${principal.name} joined the room",
timestamp = LocalDateTime.now()
)
}
@SubscribeMapping("/chat.history/{roomId}")
fun getChatHistory(
@DestinationVariable roomId: String
): List<ChatMessage> {
// 返回聊天历史记录
return chatService.getRecentMessages(roomId, 50)
}
}
场景2:实时通知系统
kotlin
@Controller
class NotificationController {
@Autowired
private lateinit var messagingTemplate: SimpMessagingTemplate
@MessageMapping("/notification.subscribe")
fun subscribeToNotifications(principal: Principal): String {
// 用户订阅通知
notificationService.subscribe(principal.name)
return "Subscribed to notifications"
}
// 系统内部调用,发送通知
@EventListener
fun handleNotificationEvent(event: NotificationEvent) {
val notification = Notification(
title = event.title,
message = event.message,
type = event.type,
timestamp = LocalDateTime.now()
)
// 发送给特定用户
messagingTemplate.convertAndSendToUser(
event.userId,
"/queue/notifications",
notification
)
}
@MessageExceptionHandler
@SendToUser("/queue/errors")
fun handleNotificationError(exception: Exception): ErrorMessage {
return ErrorMessage(
error = "Notification error",
message = exception.message ?: "Failed to process notification",
timestamp = LocalDateTime.now()
)
}
}
配置与最佳实践 ⚙️
WebSocket 配置
kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
override fun configureMessageBroker(config: MessageBrokerRegistry) {
// 启用简单代理
config.enableSimpleBroker("/topic", "/queue")
// 设置应用程序目的地前缀
config.setApplicationDestinationPrefixes("/app")
// 设置用户目的地前缀
config.setUserDestinationPrefix("/user")
}
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
// 注册 STOMP 端点
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS() // 启用 SockJS 后备选项
}
}
安全配置
kotlin
@Configuration
@EnableWebSocketSecurity
class WebSocketSecurityConfig {
@Bean
fun authorizationManager(): AuthorizationManager<Message<*>> {
val messages = AuthorizationManagerMessageMatcherRegistry()
messages
.simpDestMatchers("/app/public/**").permitAll()
.simpDestMatchers("/app/admin/**").hasRole("ADMIN")
.anyMessage().authenticated()
return messages.build()
}
}
性能优化建议 🚀
WARNING
在生产环境中,需要特别注意以下性能和安全问题:
1. 消息大小控制
kotlin
@MessageMapping("/upload")
fun handleFileUpload(@Payload data: ByteArray): String {
if (data.size > 1024 * 1024) { // 1MB 限制
throw IllegalArgumentException("File too large")
}
return "File uploaded successfully"
}
2. 连接数管理
kotlin
@Component
class ConnectionManager {
private val activeConnections = AtomicInteger(0)
private val maxConnections = 1000
@EventListener
fun handleSessionConnected(event: SessionConnectedEvent) {
if (activeConnections.incrementAndGet() > maxConnections) {
// 拒绝新连接
throw IllegalStateException("Too many connections")
}
}
@EventListener
fun handleSessionDisconnected(event: SessionDisconnectEvent) {
activeConnections.decrementAndGet()
}
}
3. 异步处理
kotlin
@Controller
class AsyncController {
@Async
@MessageMapping("/heavy-task")
@SendTo("/topic/results")
fun handleHeavyTask(@Payload data: String): CompletableFuture<String> {
return CompletableFuture.supplyAsync {
// 执行耗时操作
heavyProcessingService.process(data)
}
}
}
总结 📝
Spring WebSocket STOMP 注解控制器为我们提供了一套完整的实时通信解决方案:
✅ 简化开发:使用熟悉的注解驱动模式 ✅ 灵活路由:支持复杂的消息路由规则
✅ 异常处理:统一的异常处理机制 ✅ 性能优化:支持异步处理和连接管理 ✅ 安全保障:完整的安全配置支持
TIP
在实际项目中,建议结合具体业务场景选择合适的注解和配置策略。对于高并发场景,还需要考虑使用外部消息代理(如 RabbitMQ、Apache Kafka)来提升性能和可靠性。
通过掌握这些核心概念和最佳实践,你就能够构建出高效、可靠的实时通信应用了!🎉