Appearance
Spring STOMP 客户端:实时通信的强力工具 🚀
什么是 STOMP 客户端?为什么需要它?
在现代 Web 应用中,我们经常需要实现实时通信功能,比如聊天应用、实时通知、股票价格推送等。传统的 HTTP 请求-响应模式无法满足这些需求,因为它是单向的、短连接的。
NOTE
STOMP(Simple Text Oriented Messaging Protocol)是一个简单的面向文本的消息协议,它为客户端提供了一种与消息代理进行交互的标准方式。
想象一下这样的场景:
- 没有 STOMP 客户端:你需要不断地轮询服务器获取新消息,这会造成大量无效请求和服务器压力
- 有了 STOMP 客户端:建立一次连接后,服务器可以主动推送消息给客户端,实现真正的实时通信
Spring STOMP 客户端的核心价值
Spring 提供的 STOMP 客户端解决了以下关键问题:
- 连接管理复杂性:自动处理 WebSocket 连接的建立、维护和重连
- 消息序列化/反序列化:自动处理 Java 对象与 STOMP 消息的转换
- 心跳机制:保持连接活跃,及时发现连接断开
- 错误处理:统一的异常处理机制
- 订阅管理:简化消息订阅和取消订阅的操作
快速上手:创建你的第一个 STOMP 客户端
1. 基本配置
首先,让我们创建一个基本的 STOMP 客户端:
kotlin
import org.springframework.web.socket.client.standard.StandardWebSocketClient
import org.springframework.web.socket.messaging.WebSocketStompClient
import org.springframework.messaging.converter.StringMessageConverter
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
@Component
class StompClientConfig {
fun createStompClient(): WebSocketStompClient {
// 创建 WebSocket 客户端
val webSocketClient = StandardWebSocketClient()
// 创建 STOMP 客户端
val stompClient = WebSocketStompClient(webSocketClient)
// 配置消息转换器
stompClient.messageConverter = StringMessageConverter()
// 配置任务调度器(用于心跳)
val taskScheduler = ThreadPoolTaskScheduler().apply {
poolSize = 1
threadNamePrefix = "stomp-"
initialize()
}
stompClient.taskScheduler = taskScheduler
return stompClient
}
}
kotlin
import org.springframework.web.socket.client.sockjs.SockJsClient
import org.springframework.web.socket.client.standard.StandardWebSocketClient
@Component
class StompClientWithSockJS {
fun createStompClientWithFallback(): WebSocketStompClient {
// SockJS 客户端提供 WebSocket 和 HTTP 传输的回退机制
val sockJsClient = SockJsClient(listOf(StandardWebSocketClient()))
val stompClient = WebSocketStompClient(sockJsClient)
stompClient.messageConverter = StringMessageConverter()
return stompClient
}
}
TIP
SockJsClient
是一个很好的选择,因为它在 WebSocket 不可用时会自动回退到 HTTP 长轮询等其他传输方式,提高了兼容性。
2. 建立连接和会话处理
接下来,让我们实现连接逻辑和会话处理:
kotlin
import org.springframework.messaging.simp.stomp.*
import org.springframework.stereotype.Service
import java.lang.reflect.Type
@Service
class ChatService {
private val stompClient = createStompClient() // 从上面的配置获取
private var stompSession: StompSession? = null
fun connectToServer() {
val url = "ws://127.0.0.1:8080/websocket"
val sessionHandler = ChatSessionHandler()
try {
// 建立连接(异步操作)
val sessionFuture = stompClient.connect(url, sessionHandler)
stompSession = sessionFuture.get() // 等待连接完成
println("✅ 成功连接到 STOMP 服务器")
} catch (e: Exception) {
println("❌ 连接失败: ${e.message}")
}
}
// 自定义会话处理器
inner class ChatSessionHandler : StompSessionHandlerAdapter() {
override fun afterConnected(session: StompSession, connectedHeaders: StompHeaders) {
println("🎉 STOMP 会话已建立,会话ID: ${session.sessionId}")
// 连接成功后自动订阅聊天频道
subscribeToChat(session)
}
override fun handleException(
session: StompSession,
command: StompCommand?,
headers: StompHeaders,
payload: ByteArray,
exception: Throwable
) {
println("⚠️ STOMP 异常: ${exception.message}")
}
override fun handleTransportError(session: StompSession, exception: Throwable) {
println("🔌 传输错误: ${exception.message}")
// 可以在这里实现重连逻辑
}
}
}
IMPORTANT
StompSessionHandlerAdapter
是一个适配器类,你只需要重写你关心的方法。afterConnected
方法在连接建立后被调用,这是开始发送消息和订阅的好时机。
核心功能详解
1. 发送消息
发送消息非常简单,只需要指定目标地址和消息内容:
kotlin
class MessageSender {
fun sendChatMessage(session: StompSession, message: String) {
try {
// 发送简单文本消息
session.send("/app/chat", message)
println("📤 消息已发送: $message")
} catch (e: Exception) {
println("❌ 发送失败: ${e.message}")
}
}
fun sendObjectMessage(session: StompSession, chatMessage: ChatMessage) {
try {
// 发送对象消息(需要配置相应的消息转换器)
session.send("/app/chat", chatMessage)
println("📤 对象消息已发送")
} catch (e: Exception) {
println("❌ 发送失败: ${e.message}")
}
}
}
// 消息数据类
data class ChatMessage(
val username: String,
val content: String,
val timestamp: Long = System.currentTimeMillis()
)
2. 订阅消息
订阅功能让客户端能够接收特定主题的消息:
kotlin
class MessageSubscriber {
fun subscribeToChat(session: StompSession): StompSession.Subscription {
return session.subscribe("/topic/chat", object : StompFrameHandler {
// 指定消息载荷的类型
override fun getPayloadType(headers: StompHeaders): Type {
return String::class.java
}
// 处理接收到的消息
override fun handleFrame(headers: StompHeaders, payload: Any?) {
val message = payload as String
println("📥 收到消息: $message")
// 在这里处理业务逻辑
processReceivedMessage(message)
}
})
}
fun subscribeToUserNotifications(session: StompSession, userId: String): StompSession.Subscription {
return session.subscribe("/user/$userId/notifications", object : StompFrameHandler {
override fun getPayloadType(headers: StompHeaders): Type {
return NotificationMessage::class.java
}
override fun handleFrame(headers: StompHeaders, payload: Any?) {
val notification = payload as NotificationMessage
println("🔔 收到通知: ${notification.title}")
// 显示通知给用户
showNotificationToUser(notification)
}
})
}
private fun processReceivedMessage(message: String) {
// 处理聊天消息的业务逻辑
println("处理消息: $message")
}
private fun showNotificationToUser(notification: NotificationMessage) {
// 显示通知的业务逻辑
println("显示通知: ${notification.title} - ${notification.content}")
}
}
data class NotificationMessage(
val title: String,
val content: String,
val type: String
)
TIP
getPayloadType
方法很重要,它告诉 Spring 如何反序列化接收到的消息。你可以返回 String::class.java
用于简单文本,或者自定义类用于复杂对象。
3. 心跳机制配置
心跳机制确保连接的稳定性:
kotlin
@Configuration
class StompHeartbeatConfig {
fun configureHeartbeat(): WebSocketStompClient {
val webSocketClient = StandardWebSocketClient()
val stompClient = WebSocketStompClient(webSocketClient)
// 配置心跳任务调度器
val taskScheduler = ThreadPoolTaskScheduler().apply {
poolSize = 2
threadNamePrefix = "stomp-heartbeat-"
initialize()
}
stompClient.taskScheduler = taskScheduler
return stompClient
}
}
NOTE
Spring STOMP 客户端默认的心跳间隔是:
- 写入不活跃: 10秒(发送心跳)
- 读取不活跃: 10秒(关闭连接)
性能测试注意事项
当你使用 WebSocketStompClient
进行性能测试,模拟同一台机器上的数千个客户端时,考虑关闭心跳功能。因为每个连接都会调度自己的心跳任务,这对于在同一台机器上运行的大量客户端来说并不是最优的。
4. 消息回执确认
STOMP 协议支持消息回执,确保消息被正确处理:
kotlin
class ReceiptManager {
fun sendMessageWithReceipt(session: StompSession) {
// 启用自动回执
session.setAutoReceipt(true)
// 发送消息并获取回执句柄
val receiptable = session.send("/app/important-message", "重要消息")
// 注册回执回调
receiptable.addReceiptTask {
println("✅ 消息回执确认:消息已被服务器处理")
}
receiptable.addReceiptLostTask {
println("❌ 消息回执丢失:消息可能未被正确处理")
}
}
fun sendMessageWithManualReceipt(session: StompSession) {
val headers = StompHeaders()
headers.receipt = "receipt-${System.currentTimeMillis()}"
val receiptable = session.send(headers, "手动回执消息")
receiptable.addReceiptTask {
println("✅ 手动回执确认成功")
}
}
}
IMPORTANT
要使用回执功能,必须为客户端配置 TaskScheduler
,并且默认的回执超时时间是 15 秒。
高级配置和优化
1. 消息大小限制
为了防止过大的消息影响性能,可以配置消息大小限制:
kotlin
@Configuration
class StompMessageSizeConfig {
fun configureMessageSizeLimit(): WebSocketStompClient {
val webSocketClient = StandardWebSocketClient()
val stompClient = WebSocketStompClient(webSocketClient)
// 配置入站消息大小限制(默认 64KB)
stompClient.inboundMessageSizeLimit = 64 * 1024
// 配置出站消息大小限制(默认无限制)
stompClient.outboundMessageSizeLimit = 64 * 1024
return stompClient
}
}
WARNING
当出站 STOMP 消息超过限制时,它会被分割成部分帧,接收方需要重新组装。当入站消息超过限制时,会抛出 StompConversionException
异常。
2. 完整的聊天应用示例
让我们把所有概念整合到一个完整的聊天应用中:
完整的聊天客户端实现
kotlin
import org.springframework.messaging.converter.MappingJackson2MessageConverter
import org.springframework.messaging.simp.stomp.*
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
import org.springframework.stereotype.Service
import org.springframework.web.socket.client.standard.StandardWebSocketClient
import org.springframework.web.socket.messaging.WebSocketStompClient
import java.lang.reflect.Type
import java.util.concurrent.CompletableFuture
@Service
class ChatClient {
private val stompClient: WebSocketStompClient
private var stompSession: StompSession? = null
private var chatSubscription: StompSession.Subscription? = null
init {
stompClient = createStompClient()
}
private fun createStompClient(): WebSocketStompClient {
val webSocketClient = StandardWebSocketClient()
val client = WebSocketStompClient(webSocketClient)
// 使用 JSON 消息转换器
client.messageConverter = MappingJackson2MessageConverter()
// 配置任务调度器
val taskScheduler = ThreadPoolTaskScheduler().apply {
poolSize = 4
threadNamePrefix = "chat-stomp-"
initialize()
}
client.taskScheduler = taskScheduler
// 配置消息大小限制
client.inboundMessageSizeLimit = 128 * 1024 // 128KB
client.outboundMessageSizeLimit = 128 * 1024 // 128KB
return client
}
fun connect(username: String): CompletableFuture<Void> {
val url = "ws://localhost:8080/chat-websocket"
val sessionHandler = ChatSessionHandler(username)
return stompClient.connect(url, sessionHandler).thenAccept { session ->
this.stompSession = session
println("🎉 用户 $username 已连接到聊天服务器")
}
}
fun sendMessage(content: String) {
stompSession?.let { session ->
val message = ChatMessage(
username = getCurrentUsername(),
content = content,
timestamp = System.currentTimeMillis()
)
try {
session.send("/app/chat.send", message)
println("📤 消息已发送: $content")
} catch (e: Exception) {
println("❌ 发送消息失败: ${e.message}")
}
} ?: println("⚠️ 未连接到服务器")
}
fun disconnect() {
chatSubscription?.unsubscribe()
stompSession?.disconnect()
println("👋 已断开连接")
}
private fun getCurrentUsername(): String {
// 这里应该从用户会话或上下文中获取用户名
return "CurrentUser"
}
inner class ChatSessionHandler(private val username: String) : StompSessionHandlerAdapter() {
override fun afterConnected(session: StompSession, connectedHeaders: StompHeaders) {
println("✅ STOMP 会话已建立")
// 订阅公共聊天频道
chatSubscription = session.subscribe("/topic/chat", ChatMessageHandler())
// 订阅个人通知频道
session.subscribe("/user/queue/notifications", NotificationHandler())
// 发送用户加入通知
val joinMessage = ChatMessage(
username = username,
content = "加入了聊天室",
timestamp = System.currentTimeMillis(),
type = "JOIN"
)
session.send("/app/chat.join", joinMessage)
}
override fun handleException(
session: StompSession,
command: StompCommand?,
headers: StompHeaders,
payload: ByteArray,
exception: Throwable
) {
println("⚠️ STOMP 异常: ${exception.message}")
}
override fun handleTransportError(session: StompSession, exception: Throwable) {
println("🔌 传输错误: ${exception.message}")
// 实现重连逻辑
reconnect()
}
private fun reconnect() {
println("🔄 尝试重新连接...")
// 延迟重连逻辑
Thread.sleep(5000)
connect(username)
}
}
inner class ChatMessageHandler : StompFrameHandler {
override fun getPayloadType(headers: StompHeaders): Type = ChatMessage::class.java
override fun handleFrame(headers: StompHeaders, payload: Any?) {
val message = payload as ChatMessage
when (message.type) {
"JOIN" -> println("👤 ${message.username} ${message.content}")
"LEAVE" -> println("👋 ${message.username} ${message.content}")
else -> println("💬 ${message.username}: ${message.content}")
}
}
}
inner class NotificationHandler : StompFrameHandler {
override fun getPayloadType(headers: StompHeaders): Type = NotificationMessage::class.java
override fun handleFrame(headers: StompHeaders, payload: Any?) {
val notification = payload as NotificationMessage
println("🔔 通知: ${notification.title} - ${notification.content}")
}
}
}
// 数据类定义
data class ChatMessage(
val username: String,
val content: String,
val timestamp: Long,
val type: String = "MESSAGE"
)
data class NotificationMessage(
val title: String,
val content: String,
val type: String = "INFO"
)
3. 使用示例
kotlin
@Component
class ChatClientUsage {
@Autowired
private lateinit var chatClient: ChatClient
fun startChatSession() {
// 连接到聊天服务器
chatClient.connect("Alice").thenRun {
println("🚀 聊天会话已启动")
// 发送一些测试消息
chatClient.sendMessage("大家好!")
chatClient.sendMessage("今天天气真不错")
// 模拟用户交互
simulateUserInteraction()
}.exceptionally { throwable ->
println("❌ 连接失败: ${throwable.message}")
null
}
}
private fun simulateUserInteraction() {
// 模拟用户在聊天中的行为
Thread {
Thread.sleep(2000)
chatClient.sendMessage("有人在吗?")
Thread.sleep(3000)
chatClient.sendMessage("我要下线了,再见!")
Thread.sleep(1000)
chatClient.disconnect()
}.start()
}
}
最佳实践和注意事项
1. 错误处理策略
kotlin
class RobustStompClient {
private var reconnectAttempts = 0
private val maxReconnectAttempts = 5
fun handleConnectionError(exception: Throwable) {
when (exception) {
is ConnectException -> {
println("🔌 连接被拒绝,检查服务器是否运行")
attemptReconnect()
}
is ConnectionLostException -> {
println("📡 连接丢失,尝试重连")
attemptReconnect()
}
else -> {
println("❌ 未知错误: ${exception.message}")
}
}
}
private fun attemptReconnect() {
if (reconnectAttempts < maxReconnectAttempts) {
reconnectAttempts++
val delay = reconnectAttempts * 2000L // 指数退避
println("🔄 第 $reconnectAttempts 次重连尝试,等待 ${delay}ms")
Thread.sleep(delay)
// 重新连接逻辑
connect()
} else {
println("❌ 重连次数已达上限,停止尝试")
}
}
private fun connect() {
// 连接逻辑
}
}
2. 资源管理
CAUTION
记得在应用关闭时正确清理资源,包括断开 STOMP 连接和关闭任务调度器。
kotlin
@PreDestroy
fun cleanup() {
chatSubscription?.unsubscribe()
stompSession?.disconnect()
taskScheduler.shutdown()
println("🧹 资源已清理")
}
总结
Spring STOMP 客户端为我们提供了一个强大而灵活的实时通信解决方案。它的主要优势包括:
✅ 简化的 API:隐藏了 WebSocket 和 STOMP 协议的复杂性
✅ 自动消息转换:支持 JSON、XML 等多种消息格式
✅ 可靠的连接管理:内置心跳机制和错误处理
✅ 灵活的订阅模式:支持主题订阅和点对点消息
✅ 企业级特性:消息回执、大小限制、性能优化等
通过合理使用这些特性,你可以构建出高性能、可靠的实时通信应用。记住,好的实时通信不仅仅是技术实现,更重要的是为用户提供流畅、及时的交互体验! 🎯