Skip to content

Spring JMS 消息接收详解 📨

概述

在现代企业级应用中,消息队列是实现系统解耦、异步处理和提高系统可靠性的重要技术。Spring Framework 为 JMS(Java Message Service)提供了强大的支持,让我们能够轻松地接收和处理消息。

NOTE

JMS 是 Java 平台上的消息中间件标准,它定义了应用程序如何创建、发送、接收和读取消息。Spring JMS 在此基础上提供了更简洁、更强大的抽象。

为什么需要消息接收机制? 🤔

想象一下这样的场景:

  • 电商系统:用户下单后,需要通知库存系统、支付系统、物流系统等多个子系统
  • 日志处理:应用产生大量日志,需要异步处理避免影响主业务
  • 数据同步:多个系统间需要实时同步数据变更

如果没有消息队列,这些场景会面临:

  • 系统间强耦合,一个系统故障影响整体
  • 同步调用导致响应时间长
  • 难以处理突发流量峰值

消息接收的两种方式

Spring JMS 提供了两种主要的消息接收方式:

1. 同步接收(Synchronous Receipt)

同步接收是最简单直接的方式,调用线程会阻塞等待消息到达。

kotlin
@Service
class OrderSyncReceiver(
    private val jmsTemplate: JmsTemplate
) {
    
    fun receiveOrderMessage(): String? {
        return try {
            // 同步接收消息,会阻塞当前线程
            val message = jmsTemplate.receive("order.queue") as? TextMessage 
            message?.text
        } catch (ex: JMSException) {
            logger.error("接收消息失败", ex)
            null
        }
    }
    
    fun receiveWithTimeout(): String? {
        return try {
            // 设置超时时间,避免无限等待
            jmsTemplate.receiveTimeout = 5000 // 5秒超时
            val message = jmsTemplate.receive("order.queue") as? TextMessage
            message?.text
        } catch (ex: Exception) {
            logger.warn("接收消息超时或失败", ex)
            null
        }
    }
    
    companion object {
        private val logger = LoggerFactory.getLogger(OrderSyncReceiver::class.java)
    }
}
kotlin
@RestController
@RequestMapping("/api/orders")
class OrderController(
    private val orderSyncReceiver: OrderSyncReceiver
) {
    
    @GetMapping("/receive")
    fun receiveOrder(): ResponseEntity<String> {
        val message = orderSyncReceiver.receiveWithTimeout()
        return if (message != null) {
            ResponseEntity.ok("收到订单消息: $message")
        } else {
            ResponseEntity.noContent().build()
        }
    }
}

WARNING

同步接收会阻塞调用线程,在高并发场景下可能导致线程池耗尽。建议设置合理的超时时间,并谨慎使用。

2. 异步接收(Asynchronous Receipt)

异步接收是推荐的方式,它不会阻塞调用线程,而是通过监听器模式处理消息。

异步接收的实现方式

方式一:实现 MessageListener 接口

这是最基础的异步接收方式:

kotlin
@Component
class OrderMessageListener : MessageListener {
    
    override fun onMessage(message: Message) {
        when (message) {
            is TextMessage -> {
                try {
                    val orderData = message.text
                    processOrder(orderData) 
                    logger.info("成功处理订单消息: $orderData")
                } catch (ex: JMSException) {
                    logger.error("处理订单消息失败", ex) 
                    throw RuntimeException("消息处理异常", ex)
                }
            }
            else -> {
                logger.warn("收到不支持的消息类型: ${message.javaClass.simpleName}") 
                throw IllegalArgumentException("消息必须是 TextMessage 类型")
            }
        }
    }
    
    private fun processOrder(orderData: String) {
        // 处理订单逻辑
        val order = parseOrderFromJson(orderData)
        // 更新库存、发送通知等
        logger.info("处理订单: ${order.id}")
    }
    
    private fun parseOrderFromJson(json: String): Order {
        // JSON 解析逻辑
        return Order(id = "ORDER-001", amount = 100.0)
    }
    
    companion object {
        private val logger = LoggerFactory.getLogger(OrderMessageListener::class.java)
    }
}

data class Order(val id: String, val amount: Double)

配置消息监听容器

kotlin
@Configuration
@EnableJms
class JmsConfig {
    
    @Bean
    fun orderMessageListener() = OrderMessageListener()
    
    @Bean
    fun jmsListenerContainer(
        connectionFactory: ConnectionFactory,
        orderMessageListener: OrderMessageListener
    ): DefaultMessageListenerContainer {
        return DefaultMessageListenerContainer().apply {
            setConnectionFactory(connectionFactory) 
            setDestinationName("order.queue") 
            setMessageListener(orderMessageListener) 
            setConcurrentConsumers(2) // 并发消费者数量
            setMaxConcurrentConsumers(5) // 最大并发消费者数量
        }
    }
}

方式二:使用 SessionAwareMessageListener

当需要在消息处理过程中发送响应消息时,可以使用 SessionAwareMessageListener

kotlin
@Component
class OrderSessionAwareListener : SessionAwareMessageListener<Message> {
    
    override fun onMessage(message: Message, session: Session) {
        when (message) {
            is TextMessage -> {
                try {
                    val orderData = message.text
                    val result = processOrder(orderData)
                    
                    // 发送处理结果到响应队列
                    sendResponse(session, message, result) 
                    
                } catch (ex: Exception) {
                    logger.error("处理订单失败", ex)
                    sendErrorResponse(session, message, ex.message ?: "未知错误")
                }
            }
        }
    }
    
    private fun sendResponse(session: Session, originalMessage: Message, result: String) {
        val replyTo = originalMessage.jmsReplyTo
        if (replyTo != null) {
            val responseMessage = session.createTextMessage(result) 
            val producer = session.createProducer(replyTo)
            producer.send(responseMessage)
            producer.close()
            logger.info("发送响应消息: $result")
        }
    }
    
    private fun sendErrorResponse(session: Session, originalMessage: Message, error: String) {
        val replyTo = originalMessage.jmsReplyTo
        if (replyTo != null) {
            val errorMessage = session.createTextMessage("ERROR: $error")
            val producer = session.createProducer(replyTo)
            producer.send(errorMessage)
            producer.close()
        }
    }
    
    private fun processOrder(orderData: String): String {
        // 处理订单逻辑
        return "订单处理成功: $orderData"
    }
    
    companion object {
        private val logger = LoggerFactory.getLogger(OrderSessionAwareListener::class.java)
    }
}

方式三:使用 MessageListenerAdapter(推荐)

MessageListenerAdapter 是 Spring 提供的最灵活的方式,它允许普通的 POJO 作为消息监听器:

kotlin
// 普通的业务服务类,无需实现任何 JMS 接口
@Service
class OrderService {
    
    // 处理不同类型的消息
    fun handleMessage(message: String) { 
        logger.info("处理文本消息: $message")
        processTextOrder(message)
    }
    
    fun handleMessage(message: Map<String, Any>) { 
        logger.info("处理Map消息: $message")
        processMapOrder(message)
    }
    
    fun handleMessage(message: ByteArray) { 
        logger.info("处理字节数组消息,长度: ${message.size}")
        processBinaryOrder(message)
    }
    
    // 带返回值的处理方法,自动发送响应
    fun processOrderWithResponse(orderData: String): String { 
        return try {
            val order = parseOrder(orderData)
            // 业务处理逻辑
            updateInventory(order)
            sendNotification(order)
            "订单 ${order.id} 处理成功"
        } catch (ex: Exception) {
            logger.error("订单处理失败", ex)
            "订单处理失败: ${ex.message}"
        }
    }
    
    private fun processTextOrder(message: String) {
        // 处理文本订单
    }
    
    private fun processMapOrder(message: Map<String, Any>) {
        // 处理Map格式订单
    }
    
    private fun processBinaryOrder(message: ByteArray) {
        // 处理二进制订单数据
    }
    
    private fun parseOrder(data: String): Order {
        return Order(id = "ORDER-${System.currentTimeMillis()}", amount = 100.0)
    }
    
    private fun updateInventory(order: Order) {
        logger.info("更新库存: ${order.id}")
    }
    
    private fun sendNotification(order: Order) {
        logger.info("发送通知: ${order.id}")
    }
    
    companion object {
        private val logger = LoggerFactory.getLogger(OrderService::class.java)
    }
}

配置 MessageListenerAdapter:

kotlin
@Configuration
class MessageAdapterConfig {
    
    @Bean
    fun orderMessageAdapter(orderService: OrderService): MessageListenerAdapter {
        return MessageListenerAdapter(orderService).apply {
            // 设置默认的处理方法名
            setDefaultListenerMethod("handleMessage") 
            
            // 可以设置自定义的消息转换器
            setMessageConverter(SimpleMessageConverter())
        }
    }
    
    @Bean
    fun orderResponseAdapter(orderService: OrderService): MessageListenerAdapter {
        return MessageListenerAdapter(orderService).apply {
            // 指定带响应的处理方法
            setDefaultListenerMethod("processOrderWithResponse") 
            
            // 设置默认响应目标
            setDefaultResponseDestination(ActiveMQQueue("order.response.queue"))
        }
    }
    
    @Bean
    fun orderListenerContainer(
        connectionFactory: ConnectionFactory,
        orderMessageAdapter: MessageListenerAdapter
    ): DefaultMessageListenerContainer {
        return DefaultMessageListenerContainer().apply {
            setConnectionFactory(connectionFactory)
            setDestinationName("order.queue")
            setMessageListener(orderMessageAdapter)
        }
    }
}

使用注解简化配置

Spring 还提供了 @JmsListener 注解,这是最简洁的方式:

kotlin
@Component
class AnnotationOrderListener {
    
    @JmsListener(destination = "order.queue")
    fun handleOrder(orderData: String) { 
        logger.info("收到订单: $orderData")
        processOrder(orderData)
    }
    
    @JmsListener(destination = "order.priority.queue", concurrency = "2-5")
    fun handlePriorityOrder(
        @Payload orderData: String, 
        @Header("priority") priority: String
    ) {
        logger.info("收到优先级订单: $orderData, 优先级: $priority")
        processPriorityOrder(orderData, priority)
    }
    
    // 带响应的监听器
    @JmsListener(destination = "order.request.queue")
    @SendTo("order.response.queue")
    fun processOrderWithResponse(orderData: String): String { 
        return try {
            processOrder(orderData)
            "订单处理成功: $orderData"
        } catch (ex: Exception) {
            "订单处理失败: ${ex.message}"
        }
    }
    
    private fun processOrder(orderData: String) {
        // 订单处理逻辑
        Thread.sleep(100) // 模拟处理时间
    }
    
    private fun processPriorityOrder(orderData: String, priority: String) {
        // 优先级订单处理逻辑
        when (priority) {
            "HIGH" -> processHighPriorityOrder(orderData)
            "NORMAL" -> processOrder(orderData)
            else -> logger.warn("未知优先级: $priority")
        }
    }
    
    private fun processHighPriorityOrder(orderData: String) {
        // 高优先级处理逻辑
        logger.info("高优先级处理: $orderData")
    }
    
    companion object {
        private val logger = LoggerFactory.getLogger(AnnotationOrderListener::class.java)
    }
}

启用注解支持:

kotlin
@Configuration
@EnableJms
class JmsAnnotationConfig {
    
    @Bean
    fun jmsListenerContainerFactory(
        connectionFactory: ConnectionFactory
    ): DefaultJmsListenerContainerFactory {
        return DefaultJmsListenerContainerFactory().apply {
            setConnectionFactory(connectionFactory)
            setConcurrency("1-3") // 默认并发设置
            setSessionTransacted(true) // 启用事务
        }
    }
}

事务处理

在企业级应用中,消息处理往往需要与数据库操作保持事务一致性:

本地事务

kotlin
@Configuration
class TransactionalJmsConfig {
    
    @Bean
    fun transactionalListenerContainer(
        connectionFactory: ConnectionFactory,
        orderMessageListener: OrderMessageListener
    ): DefaultMessageListenerContainer {
        return DefaultMessageListenerContainer().apply {
            setConnectionFactory(connectionFactory)
            setDestinationName("order.queue")
            setMessageListener(orderMessageListener)
            isSessionTransacted = true
        }
    }
}

@Service
@Transactional
class TransactionalOrderService {
    
    @Autowired
    private lateinit var orderRepository: OrderRepository
    
    @JmsListener(destination = "order.queue")
    @Transactional
    fun handleOrder(orderData: String) {
        try {
            val order = parseOrder(orderData)
            
            // 数据库操作
            orderRepository.save(order) 
            
            // 如果这里抛出异常,消息会回滚
            if (order.amount > 10000) {
                throw BusinessException("订单金额过大")
            }
            
            logger.info("订单保存成功: ${order.id}")
        } catch (ex: Exception) {
            logger.error("订单处理失败,事务将回滚", ex)
            throw ex // 重新抛出异常,触发事务回滚
        }
    }
    
    private fun parseOrder(data: String): Order {
        return Order(id = "ORDER-${System.currentTimeMillis()}", amount = 100.0)
    }
}

class BusinessException(message: String) : RuntimeException(message)

分布式事务(XA)

分布式事务配置示例
kotlin
@Configuration
class XATransactionConfig {
    
    @Bean
    fun jtaTransactionManager(): JtaTransactionManager {
        return JtaTransactionManager() 
    }
    
    @Bean
    fun xaListenerContainer(
        connectionFactory: ConnectionFactory,
        transactionManager: JtaTransactionManager,
        orderMessageListener: OrderMessageListener
    ): DefaultMessageListenerContainer {
        return DefaultMessageListenerContainer().apply {
            setConnectionFactory(connectionFactory)
            setDestinationName("order.queue")
            setMessageListener(orderMessageListener)
            setTransactionManager(transactionManager) 
        }
    }
}

@Service
class XAOrderService {
    
    @Autowired
    private lateinit var orderRepository: OrderRepository
    
    @Autowired
    private lateinit var inventoryService: InventoryService
    
    @JmsListener(destination = "order.queue")
    @Transactional // 使用 JTA 事务管理器
    fun handleOrderWithXA(orderData: String) {
        val order = parseOrder(orderData)
        
        // 数据库操作(参与 XA 事务)
        orderRepository.save(order)
        
        // 调用其他服务(参与 XA 事务)
        inventoryService.updateInventory(order.productId, order.quantity)
        
        // 如果任何操作失败,整个分布式事务都会回滚
        logger.info("XA事务处理完成: ${order.id}")
    }
    
    private fun parseOrder(data: String): Order {
        return Order(
            id = "ORDER-${System.currentTimeMillis()}", 
            amount = 100.0,
            productId = "PROD-001",
            quantity = 2
        )
    }
}

data class Order(
    val id: String, 
    val amount: Double,
    val productId: String = "",
    val quantity: Int = 0
)

错误处理和重试机制

在实际应用中,消息处理可能会失败,需要合适的错误处理策略:

kotlin
@Component
class RobustOrderListener {
    
    @JmsListener(destination = "order.queue")
    fun handleOrderWithRetry(
        orderData: String,
        @Header(JmsHeaders.REDELIVERED, required = false) redelivered: Boolean = false
    ) {
        try {
            if (redelivered) {
                logger.warn("处理重发消息: $orderData")
            }
            
            processOrder(orderData)
            
        } catch (ex: TemporaryException) {
            // 临时异常,可以重试
            logger.warn("临时异常,消息将重新投递", ex) 
            throw ex // 重新抛出,触发重试
            
        } catch (ex: PermanentException) {
            // 永久异常,不应重试
            logger.error("永久异常,消息将被丢弃", ex) 
            sendToDeadLetterQueue(orderData, ex)
            // 不重新抛出异常,消息被确认
            
        } catch (ex: Exception) {
            // 未知异常,谨慎处理
            logger.error("未知异常", ex)
            if (shouldRetry(orderData)) {
                throw ex
            } else {
                sendToDeadLetterQueue(orderData, ex)
            }
        }
    }
    
    private fun processOrder(orderData: String) {
        // 模拟可能失败的业务逻辑
        if (orderData.contains("FAIL")) {
            throw TemporaryException("模拟临时失败")
        }
        if (orderData.contains("INVALID")) {
            throw PermanentException("无效的订单数据")
        }
        
        logger.info("订单处理成功: $orderData")
    }
    
    private fun shouldRetry(orderData: String): Boolean {
        // 实现重试策略逻辑
        return !orderData.contains("NO_RETRY")
    }
    
    private fun sendToDeadLetterQueue(orderData: String, exception: Exception) {
        // 发送到死信队列
        logger.info("发送到死信队列: $orderData")
    }
    
    companion object {
        private val logger = LoggerFactory.getLogger(RobustOrderListener::class.java)
    }
}

class TemporaryException(message: String) : RuntimeException(message)
class PermanentException(message: String) : RuntimeException(message)

性能优化建议

1. 合理设置并发消费者

kotlin
@Bean
fun optimizedListenerContainer(
    connectionFactory: ConnectionFactory
): DefaultMessageListenerContainer {
    return DefaultMessageListenerContainer().apply {
        setConnectionFactory(connectionFactory)
        setDestinationName("order.queue")
        
        // 并发设置
        setConcurrentConsumers(2) // 初始消费者数量
        setMaxConcurrentConsumers(10) // 最大消费者数量
        
        // 性能调优
        setReceiveTimeout(1000) // 接收超时
        setIdleConsumerLimit(5) // 空闲消费者限制
        setIdleTaskExecutionLimit(10) // 空闲任务执行限制
    }
}

2. 消息预取和批处理

kotlin
@Configuration
class PerformanceConfig {
    
    @Bean
    fun highThroughputContainer(
        connectionFactory: ConnectionFactory
    ): DefaultMessageListenerContainer {
        return DefaultMessageListenerContainer().apply {
            setConnectionFactory(connectionFactory)
            setDestinationName("high.volume.queue")
            
            // 批处理设置
            setConcurrentConsumers(5)
            setMaxConcurrentConsumers(20)
            
            // 预取设置(如果使用 ActiveMQ)
            if (connectionFactory is ActiveMQConnectionFactory) {
                connectionFactory.prefetchPolicy.apply {
                    queuePrefetch = 100 // 队列预取数量
                }
            }
        }
    }
}

监控和诊断

添加监控指标以便运维:

kotlin
@Component
class MonitoredOrderListener {
    
    private val processedCounter = Counter.build()
        .name("jms_messages_processed_total")
        .help("Total processed messages")
        .labelNames("queue", "status")
        .register()
    
    private val processingTimer = Timer.build()
        .name("jms_message_processing_duration_seconds")
        .help("Message processing duration")
        .labelNames("queue")
        .register()
    
    @JmsListener(destination = "order.queue")
    fun handleOrderWithMetrics(orderData: String) {
        val timer = processingTimer.labels("order.queue").startTimer()
        
        try {
            processOrder(orderData)
            processedCounter.labels("order.queue", "success").inc() 
            
        } catch (ex: Exception) {
            processedCounter.labels("order.queue", "error").inc() 
            logger.error("消息处理失败", ex)
            throw ex
        } finally {
            timer.observeDuration()
        }
    }
    
    private fun processOrder(orderData: String) {
        // 业务逻辑
        Thread.sleep(50) // 模拟处理时间
        logger.info("处理订单: $orderData")
    }
    
    companion object {
        private val logger = LoggerFactory.getLogger(MonitoredOrderListener::class.java)
    }
}

总结

Spring JMS 提供了多种灵活的消息接收方式:

选择建议

  1. 简单场景:使用 @JmsListener 注解,简洁高效
  2. 需要访问 Session:使用 SessionAwareMessageListener
  3. 复杂业务逻辑:使用 MessageListenerAdapter 配合 POJO
  4. 高性能场景:合理配置并发消费者和预取参数
  5. 事务场景:根据需要选择本地事务或分布式事务

IMPORTANT

在生产环境中,务必考虑错误处理、监控告警、性能调优等方面,确保消息处理的可靠性和高效性。

通过合理使用这些特性,我们可以构建出高可用、高性能的消息驱动应用程序! 🚀