Appearance
STOMP 拦截器(Interception):消息流的守门员 🛡️
概述
在 WebSocket + STOMP 的世界里,如果说事件(Events)是"通知员",那么拦截器(Interceptor)就是"守门员"。它们能够拦截每一条消息,在消息处理的任何环节进行干预,为我们提供了强大的消息控制能力。
NOTE
拦截器与事件的区别:事件只提供 STOMP 连接生命周期的通知,而拦截器可以拦截每一条客户端消息,并在处理链的任何部分进行操作。
为什么需要拦截器? 🤔
想象一下这些场景:
- 📊 监控统计:记录每条消息的发送时间、用户信息
- 🔐 权限控制:检查用户是否有权限发送特定消息
- 🛡️ 安全过滤:过滤恶意内容或敏感信息
- 📝 日志记录:记录所有消息流转的详细信息
- ⚡ 性能优化:统计消息处理时间,发现性能瓶颈
如果没有拦截器,我们就需要在每个消息处理方法中重复编写这些逻辑,代码会变得冗余且难以维护。
拦截器的工作原理
基础拦截器配置
1. 配置拦截器
kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfiguration : WebSocketMessageBrokerConfigurer {
override fun configureClientInboundChannel(registration: ChannelRegistration) {
// 注册入站消息拦截器
registration.interceptors(MyChannelInterceptor())
}
override fun configureClientOutboundChannel(registration: ChannelRegistration) {
// 注册出站消息拦截器(可选)
registration.interceptors(MyOutboundInterceptor())
}
}
java
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
// 注册入站消息拦截器
registration.interceptors(new MyChannelInterceptor());
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
// 注册出站消息拦截器(可选)
registration.interceptors(new MyOutboundInterceptor());
}
}
2. 实现基础拦截器
kotlin
class MyChannelInterceptor : ChannelInterceptor {
override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>? {
// 获取 STOMP 头部访问器
val accessor = StompHeaderAccessor.wrap(message)
val command = accessor.command
// 根据不同的 STOMP 命令进行处理
when (command) {
StompCommand.CONNECT -> handleConnect(accessor)
StompCommand.SUBSCRIBE -> handleSubscribe(accessor)
StompCommand.SEND -> handleSend(accessor)
StompCommand.DISCONNECT -> handleDisconnect(accessor)
else -> { /* 其他命令处理 */ }
}
return message // 返回消息继续处理,返回 null 则阻止消息
}
override fun postSend(message: Message<*>, channel: MessageChannel, sent: Boolean) {
// 消息发送后的处理
if (sent) {
println("消息发送成功: ${message.headers}")
} else {
println("消息发送失败: ${message.headers}")
}
}
private fun handleConnect(accessor: StompHeaderAccessor) {
println("用户连接: ${accessor.sessionId}")
}
private fun handleSubscribe(accessor: StompHeaderAccessor) {
println("用户订阅: ${accessor.destination}")
}
private fun handleSend(accessor: StompHeaderAccessor) {
println("用户发送消息到: ${accessor.destination}")
}
private fun handleDisconnect(accessor: StompHeaderAccessor) {
println("用户断开连接: ${accessor.sessionId}")
}
}
实战案例:构建消息监控系统 📊
让我们构建一个完整的消息监控和权限控制系统:
1. 消息统计拦截器
kotlin
@Component
class MessageStatisticsInterceptor(
private val messageStatisticsService: MessageStatisticsService
) : ChannelInterceptor {
private val logger = LoggerFactory.getLogger(MessageStatisticsInterceptor::class.java)
override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>? {
val accessor = StompHeaderAccessor.wrap(message)
val sessionId = accessor.sessionId
val command = accessor.command
// 记录消息统计
messageStatisticsService.recordMessage(
sessionId = sessionId,
command = command?.name,
destination = accessor.destination,
timestamp = System.currentTimeMillis()
)
// 添加处理时间戳到消息头
accessor.setHeader("processStartTime", System.currentTimeMillis())
logger.info("处理消息: 会话={}, 命令={}, 目标={}", sessionId, command, accessor.destination)
return message
}
override fun postSend(message: Message<*>, channel: MessageChannel, sent: Boolean) {
val accessor = StompHeaderAccessor.wrap(message)
val startTime = accessor.getFirstNativeHeader("processStartTime")?.toLongOrNull()
if (startTime != null) {
val processingTime = System.currentTimeMillis() - startTime
messageStatisticsService.recordProcessingTime(
sessionId = accessor.sessionId,
processingTime = processingTime
)
if (processingTime > 1000) { // 处理时间超过1秒
logger.warn("消息处理耗时过长: {}ms, 会话: {}", processingTime, accessor.sessionId)
}
}
}
}
2. 权限控制拦截器
kotlin
@Component
class SecurityInterceptor(
private val userService: UserService,
private val permissionService: PermissionService
) : ChannelInterceptor {
private val logger = LoggerFactory.getLogger(SecurityInterceptor::class.java)
override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>? {
val accessor = StompHeaderAccessor.wrap(message)
val command = accessor.command
// 只对需要权限检查的命令进行处理
if (command in listOf(StompCommand.SUBSCRIBE, StompCommand.SEND)) {
return checkPermission(accessor, message)
}
return message
}
private fun checkPermission(accessor: StompHeaderAccessor, message: Message<*>): Message<*>? {
val sessionId = accessor.sessionId ?: return null
val destination = accessor.destination ?: return null
// 获取用户信息
val user = userService.getUserBySessionId(sessionId)
if (user == null) {
logger.warn("未找到用户信息: sessionId={}", sessionId)
return null // 阻止消息处理
}
// 检查权限
val hasPermission = when (accessor.command) {
StompCommand.SUBSCRIBE -> permissionService.canSubscribe(user, destination)
StompCommand.SEND -> permissionService.canSend(user, destination)
else -> true
}
if (!hasPermission) {
logger.warn("用户权限不足: user={}, destination={}, command={}",
user.username, destination, accessor.command)
// 发送错误消息给客户端
sendErrorToClient(accessor, "权限不足")
return null // 阻止消息处理
}
// 在消息头中添加用户信息,供后续处理使用
accessor.setUser(user)
accessor.setHeader("userId", user.id)
return message
}
private fun sendErrorToClient(accessor: StompHeaderAccessor, errorMessage: String) {
// 这里可以通过 SimpMessagingTemplate 发送错误消息
// 具体实现取决于你的错误处理策略
}
}
3. 消息内容过滤拦截器
kotlin
@Component
class ContentFilterInterceptor(
private val contentFilterService: ContentFilterService
) : ChannelInterceptor {
override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>? {
val accessor = StompHeaderAccessor.wrap(message)
// 只对发送消息进行内容过滤
if (accessor.command == StompCommand.SEND) {
return filterMessageContent(message, accessor)
}
return message
}
private fun filterMessageContent(message: Message<*>, accessor: StompHeaderAccessor): Message<*>? {
val payload = message.payload
// 如果是文本消息,进行内容过滤
if (payload is String) {
val filteredContent = contentFilterService.filterContent(payload)
if (filteredContent == null) {
// 内容被完全过滤,阻止消息
return null
}
if (filteredContent != payload) {
// 内容被修改,创建新的消息
return MessageBuilder
.withPayload(filteredContent)
.copyHeaders(message.headers)
.build()
}
}
return message
}
}
ExecutorChannelInterceptor:线程级拦截 🧵
对于需要在消息处理线程中进行操作的场景,可以使用 ExecutorChannelInterceptor
:
kotlin
@Component
class ThreadAwareInterceptor : ExecutorChannelInterceptor {
private val logger = LoggerFactory.getLogger(ThreadAwareInterceptor::class.java)
override fun beforeHandle(
message: Message<*>,
channel: MessageChannel,
handler: MessageHandler
): Message<*>? {
val threadName = Thread.currentThread().name
val accessor = StompHeaderAccessor.wrap(message)
logger.info("开始处理消息: 线程={}, 会话={}, 处理器={}",
threadName, accessor.sessionId, handler.javaClass.simpleName)
// 设置线程上下文
MDC.put("sessionId", accessor.sessionId)
MDC.put("threadName", threadName)
return message
}
override fun afterMessageHandled(
message: Message<*>,
channel: MessageChannel,
handler: MessageHandler,
ex: Exception?
) {
if (ex != null) {
logger.error("消息处理异常: {}", ex.message, ex)
} else {
logger.info("消息处理完成")
}
// 清理线程上下文
MDC.clear()
}
}
拦截器链的配置和顺序 🔗
当有多个拦截器时,可以通过配置控制它们的执行顺序:
kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfiguration : WebSocketMessageBrokerConfigurer {
@Autowired
private lateinit var securityInterceptor: SecurityInterceptor
@Autowired
private lateinit var statisticsInterceptor: MessageStatisticsInterceptor
@Autowired
private lateinit var contentFilterInterceptor: ContentFilterInterceptor
override fun configureClientInboundChannel(registration: ChannelRegistration) {
// 拦截器按顺序执行:安全检查 -> 内容过滤 -> 统计记录
registration.interceptors(
securityInterceptor, // 1. 首先进行安全检查
contentFilterInterceptor, // 2. 然后过滤内容
statisticsInterceptor // 3. 最后记录统计
)
// 配置线程池
registration.taskExecutor(ThreadPoolTaskExecutor().apply {
corePoolSize = 8
maxPoolSize = 16
queueCapacity = 100
setThreadNamePrefix("stomp-")
initialize()
})
}
}
注意事项和最佳实践 ⚠️
WARNING
DISCONNECT 消息的特殊性:DISCONNECT 消息可能来自客户端,也可能在 WebSocket 会话关闭时自动生成。在某些情况下,拦截器可能会为同一个会话多次拦截此消息。
IMPORTANT
幂等性要求:组件应该对多个断开连接事件保持幂等性,即多次执行相同操作应该产生相同结果。
处理重复 DISCONNECT 消息
kotlin
@Component
class DisconnectHandlingInterceptor : ChannelInterceptor {
private val processedDisconnects = ConcurrentHashMap<String, Boolean>()
override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>? {
val accessor = StompHeaderAccessor.wrap(message)
if (accessor.command == StompCommand.DISCONNECT) {
val sessionId = accessor.sessionId ?: return message
// 检查是否已经处理过这个会话的断开连接
if (processedDisconnects.putIfAbsent(sessionId, true) != null) {
// 已经处理过,跳过
return message
}
// 处理断开连接逻辑
handleDisconnect(sessionId)
// 设置定时清理,避免内存泄漏
scheduleCleanup(sessionId)
}
return message
}
private fun handleDisconnect(sessionId: String) {
// 执行断开连接的清理工作
println("处理用户断开连接: $sessionId")
}
private fun scheduleCleanup(sessionId: String) {
// 5分钟后清理记录,避免内存泄漏
Timer().schedule(object : TimerTask() {
override fun run() {
processedDisconnects.remove(sessionId)
}
}, 5 * 60 * 1000) // 5分钟
}
}
性能监控面板示例 📈
结合拦截器收集的数据,我们可以构建一个简单的监控面板:
kotlin
@RestController
@RequestMapping("/api/websocket")
class WebSocketMonitorController(
private val messageStatisticsService: MessageStatisticsService
) {
@GetMapping("/stats")
fun getStatistics(): WebSocketStats {
return messageStatisticsService.getOverallStats()
}
@GetMapping("/active-sessions")
fun getActiveSessions(): List<SessionInfo> {
return messageStatisticsService.getActiveSessions()
}
}
data class WebSocketStats(
val totalConnections: Long,
val activeConnections: Int,
val totalMessages: Long,
val averageProcessingTime: Double,
val errorRate: Double
)
data class SessionInfo(
val sessionId: String,
val userId: String?,
val connectTime: Long,
val messageCount: Int,
val lastActivity: Long
)
总结 🎯
STOMP 拦截器为我们提供了强大的消息控制能力:
- 🛡️ 全面监控:可以拦截所有消息,无遗漏
- 🔧 灵活处理:支持预处理和后处理
- 🧵 线程感知:ExecutorChannelInterceptor 提供线程级控制
- 🔗 链式处理:支持多个拦截器协同工作
通过合理使用拦截器,我们可以构建出健壮、安全、高性能的 WebSocket 应用程序。记住,拦截器是消息流的守门员,用好它们,你的应用将更加可靠和强大! 🚀