Skip to content

STOMP 拦截器(Interception):消息流的守门员 🛡️

概述

在 WebSocket + STOMP 的世界里,如果说事件(Events)是"通知员",那么拦截器(Interceptor)就是"守门员"。它们能够拦截每一条消息,在消息处理的任何环节进行干预,为我们提供了强大的消息控制能力。

NOTE

拦截器与事件的区别:事件只提供 STOMP 连接生命周期的通知,而拦截器可以拦截每一条客户端消息,并在处理链的任何部分进行操作。

为什么需要拦截器? 🤔

想象一下这些场景:

  • 📊 监控统计:记录每条消息的发送时间、用户信息
  • 🔐 权限控制:检查用户是否有权限发送特定消息
  • 🛡️ 安全过滤:过滤恶意内容或敏感信息
  • 📝 日志记录:记录所有消息流转的详细信息
  • 性能优化:统计消息处理时间,发现性能瓶颈

如果没有拦截器,我们就需要在每个消息处理方法中重复编写这些逻辑,代码会变得冗余且难以维护。

拦截器的工作原理

基础拦截器配置

1. 配置拦截器

kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfiguration : WebSocketMessageBrokerConfigurer {

    override fun configureClientInboundChannel(registration: ChannelRegistration) {
        // 注册入站消息拦截器
        registration.interceptors(MyChannelInterceptor()) 
    }
    
    override fun configureClientOutboundChannel(registration: ChannelRegistration) {
        // 注册出站消息拦截器(可选)
        registration.interceptors(MyOutboundInterceptor()) 
    }
}
java
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        // 注册入站消息拦截器
        registration.interceptors(new MyChannelInterceptor()); 
    }
    
    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        // 注册出站消息拦截器(可选)
        registration.interceptors(new MyOutboundInterceptor()); 
    }
}

2. 实现基础拦截器

kotlin
class MyChannelInterceptor : ChannelInterceptor {

    override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>? {
        // 获取 STOMP 头部访问器
        val accessor = StompHeaderAccessor.wrap(message) 
        val command = accessor.command 
        
        // 根据不同的 STOMP 命令进行处理
        when (command) {
            StompCommand.CONNECT -> handleConnect(accessor)
            StompCommand.SUBSCRIBE -> handleSubscribe(accessor)
            StompCommand.SEND -> handleSend(accessor)
            StompCommand.DISCONNECT -> handleDisconnect(accessor)
            else -> { /* 其他命令处理 */ }
        }
        
        return message // 返回消息继续处理,返回 null 则阻止消息
    }
    
    override fun postSend(message: Message<*>, channel: MessageChannel, sent: Boolean) {
        // 消息发送后的处理
        if (sent) {
            println("消息发送成功: ${message.headers}")
        } else {
            println("消息发送失败: ${message.headers}") 
        }
    }
    
    private fun handleConnect(accessor: StompHeaderAccessor) {
        println("用户连接: ${accessor.sessionId}")
    }
    
    private fun handleSubscribe(accessor: StompHeaderAccessor) {
        println("用户订阅: ${accessor.destination}")
    }
    
    private fun handleSend(accessor: StompHeaderAccessor) {
        println("用户发送消息到: ${accessor.destination}")
    }
    
    private fun handleDisconnect(accessor: StompHeaderAccessor) {
        println("用户断开连接: ${accessor.sessionId}")
    }
}

实战案例:构建消息监控系统 📊

让我们构建一个完整的消息监控和权限控制系统:

1. 消息统计拦截器

kotlin
@Component
class MessageStatisticsInterceptor(
    private val messageStatisticsService: MessageStatisticsService
) : ChannelInterceptor {

    private val logger = LoggerFactory.getLogger(MessageStatisticsInterceptor::class.java)

    override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>? {
        val accessor = StompHeaderAccessor.wrap(message)
        val sessionId = accessor.sessionId
        val command = accessor.command
        
        // 记录消息统计
        messageStatisticsService.recordMessage(
            sessionId = sessionId,
            command = command?.name,
            destination = accessor.destination,
            timestamp = System.currentTimeMillis()
        )
        
        // 添加处理时间戳到消息头
        accessor.setHeader("processStartTime", System.currentTimeMillis()) 
        
        logger.info("处理消息: 会话={}, 命令={}, 目标={}", sessionId, command, accessor.destination)
        
        return message
    }

    override fun postSend(message: Message<*>, channel: MessageChannel, sent: Boolean) {
        val accessor = StompHeaderAccessor.wrap(message)
        val startTime = accessor.getFirstNativeHeader("processStartTime")?.toLongOrNull()
        
        if (startTime != null) {
            val processingTime = System.currentTimeMillis() - startTime
            messageStatisticsService.recordProcessingTime(
                sessionId = accessor.sessionId,
                processingTime = processingTime
            )
            
            if (processingTime > 1000) { // 处理时间超过1秒
                logger.warn("消息处理耗时过长: {}ms, 会话: {}", processingTime, accessor.sessionId) 
            }
        }
    }
}

2. 权限控制拦截器

kotlin
@Component
class SecurityInterceptor(
    private val userService: UserService,
    private val permissionService: PermissionService
) : ChannelInterceptor {

    private val logger = LoggerFactory.getLogger(SecurityInterceptor::class.java)

    override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>? {
        val accessor = StompHeaderAccessor.wrap(message)
        val command = accessor.command
        
        // 只对需要权限检查的命令进行处理
        if (command in listOf(StompCommand.SUBSCRIBE, StompCommand.SEND)) {
            return checkPermission(accessor, message)
        }
        
        return message
    }
    
    private fun checkPermission(accessor: StompHeaderAccessor, message: Message<*>): Message<*>? {
        val sessionId = accessor.sessionId ?: return null
        val destination = accessor.destination ?: return null
        
        // 获取用户信息
        val user = userService.getUserBySessionId(sessionId)
        if (user == null) {
            logger.warn("未找到用户信息: sessionId={}", sessionId) 
            return null // 阻止消息处理
        }
        
        // 检查权限
        val hasPermission = when (accessor.command) {
            StompCommand.SUBSCRIBE -> permissionService.canSubscribe(user, destination)
            StompCommand.SEND -> permissionService.canSend(user, destination)
            else -> true
        }
        
        if (!hasPermission) {
            logger.warn("用户权限不足: user={}, destination={}, command={}", 
                user.username, destination, accessor.command) 
            
            // 发送错误消息给客户端
            sendErrorToClient(accessor, "权限不足")
            return null // 阻止消息处理
        }
        
        // 在消息头中添加用户信息,供后续处理使用
        accessor.setUser(user) 
        accessor.setHeader("userId", user.id) 
        
        return message
    }
    
    private fun sendErrorToClient(accessor: StompHeaderAccessor, errorMessage: String) {
        // 这里可以通过 SimpMessagingTemplate 发送错误消息
        // 具体实现取决于你的错误处理策略
    }
}

3. 消息内容过滤拦截器

kotlin
@Component
class ContentFilterInterceptor(
    private val contentFilterService: ContentFilterService
) : ChannelInterceptor {

    override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>? {
        val accessor = StompHeaderAccessor.wrap(message)
        
        // 只对发送消息进行内容过滤
        if (accessor.command == StompCommand.SEND) {
            return filterMessageContent(message, accessor)
        }
        
        return message
    }
    
    private fun filterMessageContent(message: Message<*>, accessor: StompHeaderAccessor): Message<*>? {
        val payload = message.payload
        
        // 如果是文本消息,进行内容过滤
        if (payload is String) {
            val filteredContent = contentFilterService.filterContent(payload)
            
            if (filteredContent == null) {
                // 内容被完全过滤,阻止消息
                return null
            }
            
            if (filteredContent != payload) {
                // 内容被修改,创建新的消息
                return MessageBuilder
                    .withPayload(filteredContent) 
                    .copyHeaders(message.headers)
                    .build()
            }
        }
        
        return message
    }
}

ExecutorChannelInterceptor:线程级拦截 🧵

对于需要在消息处理线程中进行操作的场景,可以使用 ExecutorChannelInterceptor

kotlin
@Component
class ThreadAwareInterceptor : ExecutorChannelInterceptor {

    private val logger = LoggerFactory.getLogger(ThreadAwareInterceptor::class.java)

    override fun beforeHandle(
        message: Message<*>, 
        channel: MessageChannel, 
        handler: MessageHandler
    ): Message<*>? {
        val threadName = Thread.currentThread().name
        val accessor = StompHeaderAccessor.wrap(message)
        
        logger.info("开始处理消息: 线程={}, 会话={}, 处理器={}", 
            threadName, accessor.sessionId, handler.javaClass.simpleName)
        
        // 设置线程上下文
        MDC.put("sessionId", accessor.sessionId) 
        MDC.put("threadName", threadName) 
        
        return message
    }

    override fun afterMessageHandled(
        message: Message<*>, 
        channel: MessageChannel, 
        handler: MessageHandler, 
        ex: Exception?
    ) {
        if (ex != null) {
            logger.error("消息处理异常: {}", ex.message, ex) 
        } else {
            logger.info("消息处理完成")
        }
        
        // 清理线程上下文
        MDC.clear() 
    }
}

拦截器链的配置和顺序 🔗

当有多个拦截器时,可以通过配置控制它们的执行顺序:

kotlin
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfiguration : WebSocketMessageBrokerConfigurer {

    @Autowired
    private lateinit var securityInterceptor: SecurityInterceptor
    
    @Autowired
    private lateinit var statisticsInterceptor: MessageStatisticsInterceptor
    
    @Autowired
    private lateinit var contentFilterInterceptor: ContentFilterInterceptor

    override fun configureClientInboundChannel(registration: ChannelRegistration) {
        // 拦截器按顺序执行:安全检查 -> 内容过滤 -> 统计记录
        registration.interceptors(
            securityInterceptor,        // 1. 首先进行安全检查
            contentFilterInterceptor,   // 2. 然后过滤内容
            statisticsInterceptor       // 3. 最后记录统计
        )
        
        // 配置线程池
        registration.taskExecutor(ThreadPoolTaskExecutor().apply {
            corePoolSize = 8
            maxPoolSize = 16
            queueCapacity = 100
            setThreadNamePrefix("stomp-")
            initialize()
        })
    }
}

注意事项和最佳实践 ⚠️

WARNING

DISCONNECT 消息的特殊性:DISCONNECT 消息可能来自客户端,也可能在 WebSocket 会话关闭时自动生成。在某些情况下,拦截器可能会为同一个会话多次拦截此消息。

IMPORTANT

幂等性要求:组件应该对多个断开连接事件保持幂等性,即多次执行相同操作应该产生相同结果。

处理重复 DISCONNECT 消息

kotlin
@Component
class DisconnectHandlingInterceptor : ChannelInterceptor {
    
    private val processedDisconnects = ConcurrentHashMap<String, Boolean>()

    override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>? {
        val accessor = StompHeaderAccessor.wrap(message)
        
        if (accessor.command == StompCommand.DISCONNECT) {
            val sessionId = accessor.sessionId ?: return message
            
            // 检查是否已经处理过这个会话的断开连接
            if (processedDisconnects.putIfAbsent(sessionId, true) != null) {
                // 已经处理过,跳过
                return message 
            }
            
            // 处理断开连接逻辑
            handleDisconnect(sessionId)
            
            // 设置定时清理,避免内存泄漏
            scheduleCleanup(sessionId) 
        }
        
        return message
    }
    
    private fun handleDisconnect(sessionId: String) {
        // 执行断开连接的清理工作
        println("处理用户断开连接: $sessionId")
    }
    
    private fun scheduleCleanup(sessionId: String) {
        // 5分钟后清理记录,避免内存泄漏
        Timer().schedule(object : TimerTask() {
            override fun run() {
                processedDisconnects.remove(sessionId)
            }
        }, 5 * 60 * 1000) // 5分钟
    }
}

性能监控面板示例 📈

结合拦截器收集的数据,我们可以构建一个简单的监控面板:

kotlin
@RestController
@RequestMapping("/api/websocket")
class WebSocketMonitorController(
    private val messageStatisticsService: MessageStatisticsService
) {

    @GetMapping("/stats")
    fun getStatistics(): WebSocketStats {
        return messageStatisticsService.getOverallStats()
    }
    
    @GetMapping("/active-sessions")
    fun getActiveSessions(): List<SessionInfo> {
        return messageStatisticsService.getActiveSessions()
    }
}

data class WebSocketStats(
    val totalConnections: Long,
    val activeConnections: Int,
    val totalMessages: Long,
    val averageProcessingTime: Double,
    val errorRate: Double
)

data class SessionInfo(
    val sessionId: String,
    val userId: String?,
    val connectTime: Long,
    val messageCount: Int,
    val lastActivity: Long
)

总结 🎯

STOMP 拦截器为我们提供了强大的消息控制能力:

  • 🛡️ 全面监控:可以拦截所有消息,无遗漏
  • 🔧 灵活处理:支持预处理和后处理
  • 🧵 线程感知:ExecutorChannelInterceptor 提供线程级控制
  • 🔗 链式处理:支持多个拦截器协同工作

通过合理使用拦截器,我们可以构建出健壮、安全、高性能的 WebSocket 应用程序。记住,拦截器是消息流的守门员,用好它们,你的应用将更加可靠和强大! 🚀