Skip to content

Spring STOMP 客户端:实时通信的强力工具 🚀

什么是 STOMP 客户端?为什么需要它?

在现代 Web 应用中,我们经常需要实现实时通信功能,比如聊天应用、实时通知、股票价格推送等。传统的 HTTP 请求-响应模式无法满足这些需求,因为它是单向的、短连接的。

NOTE

STOMP(Simple Text Oriented Messaging Protocol)是一个简单的面向文本的消息协议,它为客户端提供了一种与消息代理进行交互的标准方式。

想象一下这样的场景:

  • 没有 STOMP 客户端:你需要不断地轮询服务器获取新消息,这会造成大量无效请求和服务器压力
  • 有了 STOMP 客户端:建立一次连接后,服务器可以主动推送消息给客户端,实现真正的实时通信

Spring STOMP 客户端的核心价值

Spring 提供的 STOMP 客户端解决了以下关键问题:

  1. 连接管理复杂性:自动处理 WebSocket 连接的建立、维护和重连
  2. 消息序列化/反序列化:自动处理 Java 对象与 STOMP 消息的转换
  3. 心跳机制:保持连接活跃,及时发现连接断开
  4. 错误处理:统一的异常处理机制
  5. 订阅管理:简化消息订阅和取消订阅的操作

快速上手:创建你的第一个 STOMP 客户端

1. 基本配置

首先,让我们创建一个基本的 STOMP 客户端:

kotlin
import org.springframework.web.socket.client.standard.StandardWebSocketClient
import org.springframework.web.socket.messaging.WebSocketStompClient
import org.springframework.messaging.converter.StringMessageConverter
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler

@Component
class StompClientConfig {
    
    fun createStompClient(): WebSocketStompClient {
        // 创建 WebSocket 客户端
        val webSocketClient = StandardWebSocketClient()
        
        // 创建 STOMP 客户端
        val stompClient = WebSocketStompClient(webSocketClient) 
        
        // 配置消息转换器
        stompClient.messageConverter = StringMessageConverter() 
        
        // 配置任务调度器(用于心跳)
        val taskScheduler = ThreadPoolTaskScheduler().apply {
            poolSize = 1
            threadNamePrefix = "stomp-"
            initialize()
        }
        stompClient.taskScheduler = taskScheduler 
        
        return stompClient
    }
}
kotlin
import org.springframework.web.socket.client.sockjs.SockJsClient
import org.springframework.web.socket.client.standard.StandardWebSocketClient

@Component  
class StompClientWithSockJS {
    
    fun createStompClientWithFallback(): WebSocketStompClient {
        // SockJS 客户端提供 WebSocket 和 HTTP 传输的回退机制
        val sockJsClient = SockJsClient(listOf(StandardWebSocketClient())) 
        
        val stompClient = WebSocketStompClient(sockJsClient)
        stompClient.messageConverter = StringMessageConverter()
        
        return stompClient
    }
}

TIP

SockJsClient 是一个很好的选择,因为它在 WebSocket 不可用时会自动回退到 HTTP 长轮询等其他传输方式,提高了兼容性。

2. 建立连接和会话处理

接下来,让我们实现连接逻辑和会话处理:

kotlin
import org.springframework.messaging.simp.stomp.*
import org.springframework.stereotype.Service
import java.lang.reflect.Type

@Service
class ChatService {
    
    private val stompClient = createStompClient() // 从上面的配置获取
    private var stompSession: StompSession? = null
    
    fun connectToServer() {
        val url = "ws://127.0.0.1:8080/websocket"
        val sessionHandler = ChatSessionHandler()
        
        try {
            // 建立连接(异步操作)
            val sessionFuture = stompClient.connect(url, sessionHandler) 
            stompSession = sessionFuture.get() // 等待连接完成
            println("✅ 成功连接到 STOMP 服务器")
        } catch (e: Exception) {
            println("❌ 连接失败: ${e.message}") 
        }
    }
    
    // 自定义会话处理器
    inner class ChatSessionHandler : StompSessionHandlerAdapter() {
        
        override fun afterConnected(session: StompSession, connectedHeaders: StompHeaders) {
            println("🎉 STOMP 会话已建立,会话ID: ${session.sessionId}")
            
            // 连接成功后自动订阅聊天频道
            subscribeToChat(session) 
        }
        
        override fun handleException(
            session: StompSession, 
            command: StompCommand?, 
            headers: StompHeaders, 
            payload: ByteArray, 
            exception: Throwable
        ) {
            println("⚠️ STOMP 异常: ${exception.message}") 
        }
        
        override fun handleTransportError(session: StompSession, exception: Throwable) {
            println("🔌 传输错误: ${exception.message}") 
            // 可以在这里实现重连逻辑
        }
    }
}

IMPORTANT

StompSessionHandlerAdapter 是一个适配器类,你只需要重写你关心的方法。afterConnected 方法在连接建立后被调用,这是开始发送消息和订阅的好时机。

核心功能详解

1. 发送消息

发送消息非常简单,只需要指定目标地址和消息内容:

kotlin
class MessageSender {
    
    fun sendChatMessage(session: StompSession, message: String) {
        try {
            // 发送简单文本消息
            session.send("/app/chat", message) 
            println("📤 消息已发送: $message")
        } catch (e: Exception) {
            println("❌ 发送失败: ${e.message}") 
        }
    }
    
    fun sendObjectMessage(session: StompSession, chatMessage: ChatMessage) {
        try {
            // 发送对象消息(需要配置相应的消息转换器)
            session.send("/app/chat", chatMessage) 
            println("📤 对象消息已发送")
        } catch (e: Exception) {
            println("❌ 发送失败: ${e.message}") 
        }
    }
}

// 消息数据类
data class ChatMessage(
    val username: String,
    val content: String,
    val timestamp: Long = System.currentTimeMillis()
)

2. 订阅消息

订阅功能让客户端能够接收特定主题的消息:

kotlin
class MessageSubscriber {
    
    fun subscribeToChat(session: StompSession): StompSession.Subscription {
        return session.subscribe("/topic/chat", object : StompFrameHandler { 
            
            // 指定消息载荷的类型
            override fun getPayloadType(headers: StompHeaders): Type {
                return String::class.java 
            }
            
            // 处理接收到的消息
            override fun handleFrame(headers: StompHeaders, payload: Any?) {
                val message = payload as String
                println("📥 收到消息: $message") 
                
                // 在这里处理业务逻辑
                processReceivedMessage(message)
            }
        })
    }
    
    fun subscribeToUserNotifications(session: StompSession, userId: String): StompSession.Subscription {
        return session.subscribe("/user/$userId/notifications", object : StompFrameHandler {
            
            override fun getPayloadType(headers: StompHeaders): Type {
                return NotificationMessage::class.java 
            }
            
            override fun handleFrame(headers: StompHeaders, payload: Any?) {
                val notification = payload as NotificationMessage
                println("🔔 收到通知: ${notification.title}")
                
                // 显示通知给用户
                showNotificationToUser(notification)
            }
        })
    }
    
    private fun processReceivedMessage(message: String) {
        // 处理聊天消息的业务逻辑
        println("处理消息: $message")
    }
    
    private fun showNotificationToUser(notification: NotificationMessage) {
        // 显示通知的业务逻辑
        println("显示通知: ${notification.title} - ${notification.content}")
    }
}

data class NotificationMessage(
    val title: String,
    val content: String,
    val type: String
)

TIP

getPayloadType 方法很重要,它告诉 Spring 如何反序列化接收到的消息。你可以返回 String::class.java 用于简单文本,或者自定义类用于复杂对象。

3. 心跳机制配置

心跳机制确保连接的稳定性:

kotlin
@Configuration
class StompHeartbeatConfig {
    
    fun configureHeartbeat(): WebSocketStompClient {
        val webSocketClient = StandardWebSocketClient()
        val stompClient = WebSocketStompClient(webSocketClient)
        
        // 配置心跳任务调度器
        val taskScheduler = ThreadPoolTaskScheduler().apply {
            poolSize = 2
            threadNamePrefix = "stomp-heartbeat-"
            initialize()
        }
        stompClient.taskScheduler = taskScheduler 
        
        return stompClient
    }
}

NOTE

Spring STOMP 客户端默认的心跳间隔是:

  • 写入不活跃: 10秒(发送心跳)
  • 读取不活跃: 10秒(关闭连接)

性能测试注意事项

当你使用 WebSocketStompClient 进行性能测试,模拟同一台机器上的数千个客户端时,考虑关闭心跳功能。因为每个连接都会调度自己的心跳任务,这对于在同一台机器上运行的大量客户端来说并不是最优的。

4. 消息回执确认

STOMP 协议支持消息回执,确保消息被正确处理:

kotlin
class ReceiptManager {
    
    fun sendMessageWithReceipt(session: StompSession) {
        // 启用自动回执
        session.setAutoReceipt(true) 
        
        // 发送消息并获取回执句柄
        val receiptable = session.send("/app/important-message", "重要消息")
        
        // 注册回执回调
        receiptable.addReceiptTask { 
            println("✅ 消息回执确认:消息已被服务器处理")
        }
        
        receiptable.addReceiptLostTask { 
            println("❌ 消息回执丢失:消息可能未被正确处理") 
        }
    }
    
    fun sendMessageWithManualReceipt(session: StompSession) {
        val headers = StompHeaders()
        headers.receipt = "receipt-${System.currentTimeMillis()}"
        
        val receiptable = session.send(headers, "手动回执消息")
        
        receiptable.addReceiptTask {
            println("✅ 手动回执确认成功")
        }
    }
}

IMPORTANT

要使用回执功能,必须为客户端配置 TaskScheduler,并且默认的回执超时时间是 15 秒。

高级配置和优化

1. 消息大小限制

为了防止过大的消息影响性能,可以配置消息大小限制:

kotlin
@Configuration
class StompMessageSizeConfig {
    
    fun configureMessageSizeLimit(): WebSocketStompClient {
        val webSocketClient = StandardWebSocketClient()
        val stompClient = WebSocketStompClient(webSocketClient)
        
        // 配置入站消息大小限制(默认 64KB)
        stompClient.inboundMessageSizeLimit = 64 * 1024
        
        // 配置出站消息大小限制(默认无限制)
        stompClient.outboundMessageSizeLimit = 64 * 1024
        
        return stompClient
    }
}

WARNING

当出站 STOMP 消息超过限制时,它会被分割成部分帧,接收方需要重新组装。当入站消息超过限制时,会抛出 StompConversionException 异常。

2. 完整的聊天应用示例

让我们把所有概念整合到一个完整的聊天应用中:

完整的聊天客户端实现
kotlin
import org.springframework.messaging.converter.MappingJackson2MessageConverter
import org.springframework.messaging.simp.stomp.*
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
import org.springframework.stereotype.Service
import org.springframework.web.socket.client.standard.StandardWebSocketClient
import org.springframework.web.socket.messaging.WebSocketStompClient
import java.lang.reflect.Type
import java.util.concurrent.CompletableFuture

@Service
class ChatClient {
    
    private val stompClient: WebSocketStompClient
    private var stompSession: StompSession? = null
    private var chatSubscription: StompSession.Subscription? = null
    
    init {
        stompClient = createStompClient()
    }
    
    private fun createStompClient(): WebSocketStompClient {
        val webSocketClient = StandardWebSocketClient()
        val client = WebSocketStompClient(webSocketClient)
        
        // 使用 JSON 消息转换器
        client.messageConverter = MappingJackson2MessageConverter() 
        
        // 配置任务调度器
        val taskScheduler = ThreadPoolTaskScheduler().apply {
            poolSize = 4
            threadNamePrefix = "chat-stomp-"
            initialize()
        }
        client.taskScheduler = taskScheduler
        
        // 配置消息大小限制
        client.inboundMessageSizeLimit = 128 * 1024  // 128KB
        client.outboundMessageSizeLimit = 128 * 1024 // 128KB
        
        return client
    }
    
    fun connect(username: String): CompletableFuture<Void> {
        val url = "ws://localhost:8080/chat-websocket"
        val sessionHandler = ChatSessionHandler(username)
        
        return stompClient.connect(url, sessionHandler).thenAccept { session ->
            this.stompSession = session
            println("🎉 用户 $username 已连接到聊天服务器")
        }
    }
    
    fun sendMessage(content: String) {
        stompSession?.let { session ->
            val message = ChatMessage(
                username = getCurrentUsername(),
                content = content,
                timestamp = System.currentTimeMillis()
            )
            
            try {
                session.send("/app/chat.send", message) 
                println("📤 消息已发送: $content")
            } catch (e: Exception) {
                println("❌ 发送消息失败: ${e.message}") 
            }
        } ?: println("⚠️ 未连接到服务器") 
    }
    
    fun disconnect() {
        chatSubscription?.unsubscribe()
        stompSession?.disconnect()
        println("👋 已断开连接")
    }
    
    private fun getCurrentUsername(): String {
        // 这里应该从用户会话或上下文中获取用户名
        return "CurrentUser"
    }
    
    inner class ChatSessionHandler(private val username: String) : StompSessionHandlerAdapter() {
        
        override fun afterConnected(session: StompSession, connectedHeaders: StompHeaders) {
            println("✅ STOMP 会话已建立")
            
            // 订阅公共聊天频道
            chatSubscription = session.subscribe("/topic/chat", ChatMessageHandler()) 
            
            // 订阅个人通知频道
            session.subscribe("/user/queue/notifications", NotificationHandler()) 
            
            // 发送用户加入通知
            val joinMessage = ChatMessage(
                username = username,
                content = "加入了聊天室",
                timestamp = System.currentTimeMillis(),
                type = "JOIN"
            )
            session.send("/app/chat.join", joinMessage)
        }
        
        override fun handleException(
            session: StompSession,
            command: StompCommand?,
            headers: StompHeaders,
            payload: ByteArray,
            exception: Throwable
        ) {
            println("⚠️ STOMP 异常: ${exception.message}") 
        }
        
        override fun handleTransportError(session: StompSession, exception: Throwable) {
            println("🔌 传输错误: ${exception.message}") 
            // 实现重连逻辑
            reconnect()
        }
        
        private fun reconnect() {
            println("🔄 尝试重新连接...")
            // 延迟重连逻辑
            Thread.sleep(5000)
            connect(username)
        }
    }
    
    inner class ChatMessageHandler : StompFrameHandler {
        override fun getPayloadType(headers: StompHeaders): Type = ChatMessage::class.java
        
        override fun handleFrame(headers: StompHeaders, payload: Any?) {
            val message = payload as ChatMessage
            when (message.type) {
                "JOIN" -> println("👤 ${message.username} ${message.content}")
                "LEAVE" -> println("👋 ${message.username} ${message.content}")
                else -> println("💬 ${message.username}: ${message.content}")
            }
        }
    }
    
    inner class NotificationHandler : StompFrameHandler {
        override fun getPayloadType(headers: StompHeaders): Type = NotificationMessage::class.java
        
        override fun handleFrame(headers: StompHeaders, payload: Any?) {
            val notification = payload as NotificationMessage
            println("🔔 通知: ${notification.title} - ${notification.content}")
        }
    }
}

// 数据类定义
data class ChatMessage(
    val username: String,
    val content: String,
    val timestamp: Long,
    val type: String = "MESSAGE"
)

data class NotificationMessage(
    val title: String,
    val content: String,
    val type: String = "INFO"
)

3. 使用示例

kotlin
@Component
class ChatClientUsage {
    
    @Autowired
    private lateinit var chatClient: ChatClient
    
    fun startChatSession() {
        // 连接到聊天服务器
        chatClient.connect("Alice").thenRun {
            println("🚀 聊天会话已启动")
            
            // 发送一些测试消息
            chatClient.sendMessage("大家好!")
            chatClient.sendMessage("今天天气真不错")
            
            // 模拟用户交互
            simulateUserInteraction()
        }.exceptionally { throwable ->
            println("❌ 连接失败: ${throwable.message}") 
            null
        }
    }
    
    private fun simulateUserInteraction() {
        // 模拟用户在聊天中的行为
        Thread {
            Thread.sleep(2000)
            chatClient.sendMessage("有人在吗?")
            
            Thread.sleep(3000)
            chatClient.sendMessage("我要下线了,再见!")
            
            Thread.sleep(1000)
            chatClient.disconnect()
        }.start()
    }
}

最佳实践和注意事项

1. 错误处理策略

kotlin
class RobustStompClient {
    
    private var reconnectAttempts = 0
    private val maxReconnectAttempts = 5
    
    fun handleConnectionError(exception: Throwable) {
        when (exception) {
            is ConnectException -> {
                println("🔌 连接被拒绝,检查服务器是否运行") 
                attemptReconnect()
            }
            is ConnectionLostException -> {
                println("📡 连接丢失,尝试重连") 
                attemptReconnect()
            }
            else -> {
                println("❌ 未知错误: ${exception.message}") 
            }
        }
    }
    
    private fun attemptReconnect() {
        if (reconnectAttempts < maxReconnectAttempts) {
            reconnectAttempts++
            val delay = reconnectAttempts * 2000L // 指数退避
            
            println("🔄 第 $reconnectAttempts 次重连尝试,等待 ${delay}ms")
            Thread.sleep(delay)
            
            // 重新连接逻辑
            connect()
        } else {
            println("❌ 重连次数已达上限,停止尝试") 
        }
    }
    
    private fun connect() {
        // 连接逻辑
    }
}

2. 资源管理

CAUTION

记得在应用关闭时正确清理资源,包括断开 STOMP 连接和关闭任务调度器。

kotlin
@PreDestroy
fun cleanup() {
    chatSubscription?.unsubscribe()
    stompSession?.disconnect()
    taskScheduler.shutdown()
    println("🧹 资源已清理")
}

总结

Spring STOMP 客户端为我们提供了一个强大而灵活的实时通信解决方案。它的主要优势包括:

简化的 API:隐藏了 WebSocket 和 STOMP 协议的复杂性
自动消息转换:支持 JSON、XML 等多种消息格式
可靠的连接管理:内置心跳机制和错误处理
灵活的订阅模式:支持主题订阅和点对点消息
企业级特性:消息回执、大小限制、性能优化等

通过合理使用这些特性,你可以构建出高性能、可靠的实时通信应用。记住,好的实时通信不仅仅是技术实现,更重要的是为用户提供流畅、及时的交互体验! 🎯