Skip to content

Spring Boot JMS 消息队列详解 🚀

什么是 JMS?为什么需要它?

在现代分布式系统中,不同的服务组件需要相互通信。想象一下,你正在开发一个电商系统:

  • 用户下单后,需要通知库存系统减少商品数量
  • 需要通知支付系统处理付款
  • 需要通知物流系统准备发货
  • 需要发送确认邮件给用户

如果所有这些操作都同步进行,用户可能需要等待很长时间才能看到下单成功的页面。更糟糕的是,如果其中任何一个服务出现问题,整个下单流程都会失败。

NOTE

JMS (Java Message Service) 是 Java 平台上的消息中间件标准,它提供了一种异步通信机制,让应用程序可以通过消息队列进行解耦通信。

Spring Boot 中的 JMS 自动配置

Spring Boot 为 JMS 提供了开箱即用的自动配置,主要支持两种消息代理:

1. ActiveMQ "Classic" 支持

kotlin
// build.gradle.kts
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-activemq")
    // 如果需要嵌入式 broker
    implementation("org.apache.activemq:activemq-broker")
}
yaml
spring:
  activemq:
    embedded:
      enabled: true  # 启用嵌入式 broker(默认)
  jms:
    cache:
      session-cache-size: 5  # 会话缓存大小
yaml
spring:
  activemq:
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"
    pool:
      enabled: true
      max-connections: 50

2. ActiveMQ Artemis 支持

kotlin
// build.gradle.kts
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-artemis")
    // 如果需要嵌入式模式
    implementation("org.apache.activemq:artemis-jakarta-server")
}
yaml
spring:
  artemis:
    mode: native  # embedded 或 native
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"
    pool:
      enabled: true
      max-connections: 50

TIP

嵌入式 vs 外部 Broker 的选择

  • 嵌入式:适合开发和测试环境,简单快速启动
  • 外部 Broker:适合生产环境,提供更好的性能和可靠性

发送消息:JmsTemplate 的使用

Spring Boot 自动配置了 JmsTemplate,这是发送消息的核心工具:

kotlin
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Service

@Service
class OrderService(
    private val jmsTemplate: JmsTemplate
) {
    
    /**
     * 处理订单并发送异步消息
     */
    fun processOrder(order: Order) {
        // 1. 保存订单到数据库
        saveOrder(order) 
        
        // 2. 发送消息到不同的队列进行异步处理
        sendInventoryMessage(order) 
        sendPaymentMessage(order) 
        sendEmailMessage(order) 
    }
    
    private fun sendInventoryMessage(order: Order) {
        val inventoryMessage = InventoryMessage(
            orderId = order.id,
            productId = order.productId,
            quantity = order.quantity
        )
        
        // 发送到库存队列
        jmsTemplate.convertAndSend("inventory.queue", inventoryMessage) 
    }
    
    private fun sendPaymentMessage(order: Order) {
        val paymentMessage = PaymentMessage(
            orderId = order.id,
            amount = order.totalAmount,
            userId = order.userId
        )
        
        // 发送到支付队列
        jmsTemplate.convertAndSend("payment.queue", paymentMessage) 
    }
    
    private fun sendEmailMessage(order: Order) {
        val emailMessage = EmailMessage(
            to = order.userEmail,
            subject = "订单确认",
            orderId = order.id
        )
        
        // 发送到邮件队列
        jmsTemplate.convertAndSend("email.queue", emailMessage) 
    }
    
    private fun saveOrder(order: Order) {
        // 保存订单逻辑
        println("订单 ${order.id} 已保存")
    }
}

// 消息数据类
data class Order(
    val id: String,
    val productId: String,
    val quantity: Int,
    val totalAmount: Double,
    val userId: String,
    val userEmail: String
)

data class InventoryMessage(
    val orderId: String,
    val productId: String,
    val quantity: Int
)

data class PaymentMessage(
    val orderId: String,
    val amount: Double,
    val userId: String
)

data class EmailMessage(
    val to: String,
    val subject: String,
    val orderId: String
)

IMPORTANT

convertAndSend 方法会自动将 Kotlin 对象序列化为 JSON 格式发送,并在接收端自动反序列化。

接收消息:@JmsListener 注解

使用 @JmsListener 注解可以轻松创建消息监听器:

kotlin
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional

@Component
class MessageHandlers {
    
    /**
     * 处理库存消息
     */
    @JmsListener(destination = "inventory.queue") 
    @Transactional // 确保事务一致性
    fun handleInventoryMessage(message: InventoryMessage) {
        try {
            println("处理库存消息: $message")
            
            // 模拟库存处理逻辑
            updateInventory(message.productId, message.quantity)
            
            println("库存更新成功 - 订单: ${message.orderId}")
            
        } catch (e: Exception) {
            println("库存处理失败: ${e.message}") 
            throw e // 重新抛出异常,触发消息重试
        }
    }
    
    /**
     * 处理支付消息
     */
    @JmsListener(destination = "payment.queue") 
    @Transactional
    fun handlePaymentMessage(message: PaymentMessage) {
        try {
            println("处理支付消息: $message")
            
            // 模拟支付处理逻辑
            processPayment(message.userId, message.amount)
            
            println("支付处理成功 - 订单: ${message.orderId}")
            
        } catch (e: Exception) {
            println("支付处理失败: ${e.message}") 
            throw e
        }
    }
    
    /**
     * 处理邮件消息
     */
    @JmsListener(destination = "email.queue") 
    fun handleEmailMessage(message: EmailMessage) {
        try {
            println("发送邮件: $message")
            
            // 模拟邮件发送逻辑
            sendEmail(message.to, message.subject, "您的订单 ${message.orderId} 已确认")
            
            println("邮件发送成功")
            
        } catch (e: Exception) {
            println("邮件发送失败: ${e.message}") 
            // 邮件发送失败通常不需要重试,记录日志即可
        }
    }
    
    private fun updateInventory(productId: String, quantity: Int) {
        // 实际的库存更新逻辑
        Thread.sleep(100) // 模拟数据库操作
    }
    
    private fun processPayment(userId: String, amount: Double) {
        // 实际的支付处理逻辑
        Thread.sleep(200) // 模拟支付接口调用
    }
    
    private fun sendEmail(to: String, subject: String, content: String) {
        // 实际的邮件发送逻辑
        Thread.sleep(50) // 模拟邮件发送
    }
}

自定义消息监听器工厂

当需要更精细的控制时,可以自定义 JmsListenerContainerFactory

自定义监听器工厂配置
kotlin
import jakarta.jms.ConnectionFactory
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer
import org.springframework.boot.jms.ConnectionFactoryUnwrapper
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.config.DefaultJmsListenerContainerFactory
import org.springframework.jms.support.converter.MappingJackson2MessageConverter
import org.springframework.jms.support.converter.MessageConverter
import org.springframework.jms.support.converter.MessageType

@Configuration
class JmsConfig {
    
    /**
     * 自定义消息转换器 - 使用 JSON 格式
     */
    @Bean
    fun messageConverter(): MessageConverter {
        val converter = MappingJackson2MessageConverter()
        converter.setTargetType(MessageType.TEXT) 
        converter.setTypeIdPropertyName("_type") 
        return converter
    }
    
    /**
     * 自定义监听器容器工厂 - 用于高优先级消息
     */
    @Bean
    fun highPriorityFactory(
        configurer: DefaultJmsListenerContainerFactoryConfigurer,
        connectionFactory: ConnectionFactory,
        messageConverter: MessageConverter
    ): DefaultJmsListenerContainerFactory {
        
        val factory = DefaultJmsListenerContainerFactory()
        
        // 使用配置器应用默认设置
        configurer.configure(factory, ConnectionFactoryUnwrapper.unwrapCaching(connectionFactory)) 
        
        // 自定义设置
        factory.setMessageConverter(messageConverter) 
        factory.setConcurrency("5-10") // 并发消费者数量
        factory.setReceiveTimeout(5000) // 接收超时时间
        
        return factory
    }
    
    /**
     * 自定义监听器容器工厂 - 用于低优先级消息
     */
    @Bean
    fun lowPriorityFactory(
        configurer: DefaultJmsListenerContainerFactoryConfigurer,
        connectionFactory: ConnectionFactory,
        messageConverter: MessageConverter
    ): DefaultJmsListenerContainerFactory {
        
        val factory = DefaultJmsListenerContainerFactory()
        configurer.configure(factory, ConnectionFactoryUnwrapper.unwrapCaching(connectionFactory))
        
        factory.setMessageConverter(messageConverter)
        factory.setConcurrency("1-3") // 较少的并发消费者
        factory.setReceiveTimeout(10000)
        
        return factory
    }
}

使用自定义工厂:

kotlin
@Component
class PriorityMessageHandlers {
    
    /**
     * 高优先级消息处理 - 使用专门的工厂
     */
    @JmsListener(
        destination = "high-priority.queue", 
        containerFactory = "highPriorityFactory"
    )
    fun handleHighPriorityMessage(message: String) {
        println("处理高优先级消息: $message")
        // 重要业务逻辑处理
    }
    
    /**
     * 低优先级消息处理
     */
    @JmsListener(
        destination = "low-priority.queue", 
        containerFactory = "lowPriorityFactory"
    )
    fun handleLowPriorityMessage(message: String) {
        println("处理低优先级消息: $message")
        // 非关键业务逻辑处理
    }
}

实际业务场景示例

让我们看一个完整的电商订单处理示例:

kotlin
import org.springframework.web.bind.annotation.*

@RestController
@RequestMapping("/api/orders")
class OrderController(
    private val orderService: OrderService
) {
    
    @PostMapping
    fun createOrder(@RequestBody orderRequest: CreateOrderRequest): OrderResponse {
        val order = orderService.createOrder(orderRequest)
        
        return OrderResponse(
            orderId = order.id,
            status = "PROCESSING",
            message = "订单已提交,正在处理中..."
        )
    }
}

data class CreateOrderRequest(
    val productId: String,
    val quantity: Int,
    val userId: String,
    val userEmail: String
)

data class OrderResponse(
    val orderId: String,
    val status: String,
    val message: String
)
kotlin
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.util.*

@Service
@Transactional
class OrderService(
    private val jmsTemplate: JmsTemplate,
    private val orderRepository: OrderRepository
) {
    
    fun createOrder(request: CreateOrderRequest): Order {
        // 1. 创建订单对象
        val order = Order(
            id = UUID.randomUUID().toString(),
            productId = request.productId,
            quantity = request.quantity,
            totalAmount = calculateAmount(request.productId, request.quantity),
            userId = request.userId,
            userEmail = request.userEmail,
            status = OrderStatus.CREATED
        )
        
        // 2. 保存订单到数据库
        orderRepository.save(order) 
        
        // 3. 发送异步消息进行后续处理
        sendOrderProcessingMessages(order) 
        
        return order
    }
    
    private fun sendOrderProcessingMessages(order: Order) {
        // 发送库存检查消息
        jmsTemplate.convertAndSend("inventory.check", InventoryCheckMessage(
            orderId = order.id,
            productId = order.productId,
            quantity = order.quantity
        ))
        
        // 发送支付处理消息
        jmsTemplate.convertAndSend("payment.process", PaymentProcessMessage(
            orderId = order.id,
            amount = order.totalAmount,
            userId = order.userId
        ))
        
        // 发送邮件通知消息
        jmsTemplate.convertAndSend("email.notification", EmailNotificationMessage(
            orderId = order.id,
            userEmail = order.userEmail,
            type = "ORDER_CREATED"
        ))
    }
    
    private fun calculateAmount(productId: String, quantity: Int): Double {
        // 模拟价格计算
        return quantity * 99.99
    }
}
kotlin
@Component
class OrderMessageHandlers(
    private val orderRepository: OrderRepository,
    private val inventoryService: InventoryService,
    private val paymentService: PaymentService,
    private val emailService: EmailService
) {
    
    @JmsListener(destination = "inventory.check")
    @Transactional
    fun handleInventoryCheck(message: InventoryCheckMessage) {
        try {
            val available = inventoryService.checkAvailability(
                message.productId, 
                message.quantity
            )
            
            if (available) {
                // 库存充足,扣减库存
                inventoryService.reserve(message.productId, message.quantity)
                updateOrderStatus(message.orderId, OrderStatus.INVENTORY_RESERVED)
                
                // 发送库存预留成功消息
                jmsTemplate.convertAndSend("inventory.reserved", InventoryReservedMessage(
                    orderId = message.orderId,
                    productId = message.productId,
                    quantity = message.quantity
                ))
            } else {
                // 库存不足,取消订单
                updateOrderStatus(message.orderId, OrderStatus.CANCELLED)
                
                // 发送库存不足通知
                jmsTemplate.convertAndSend("order.cancelled", OrderCancelledMessage(
                    orderId = message.orderId,
                    reason = "库存不足"
                ))
            }
            
        } catch (e: Exception) {
            println("库存检查失败: ${e.message}") 
            throw e // 触发消息重试
        }
    }
    
    @JmsListener(destination = "payment.process")
    @Transactional
    fun handlePaymentProcess(message: PaymentProcessMessage) {
        try {
            val paymentResult = paymentService.processPayment(
                message.userId,
                message.amount
            )
            
            if (paymentResult.success) {
                updateOrderStatus(message.orderId, OrderStatus.PAID)
                
                // 发送支付成功消息
                jmsTemplate.convertAndSend("payment.completed", PaymentCompletedMessage(
                    orderId = message.orderId,
                    transactionId = paymentResult.transactionId
                ))
            } else {
                updateOrderStatus(message.orderId, OrderStatus.PAYMENT_FAILED)
                
                // 发送支付失败消息
                jmsTemplate.convertAndSend("payment.failed", PaymentFailedMessage(
                    orderId = message.orderId,
                    reason = paymentResult.errorMessage
                ))
            }
            
        } catch (e: Exception) {
            println("支付处理失败: ${e.message}") 
            throw e
        }
    }
    
    @JmsListener(destination = "email.notification")
    fun handleEmailNotification(message: EmailNotificationMessage) {
        try {
            when (message.type) {
                "ORDER_CREATED" -> {
                    emailService.sendOrderCreatedEmail(
                        message.userEmail,
                        message.orderId
                    )
                }
                "ORDER_COMPLETED" -> {
                    emailService.sendOrderCompletedEmail(
                        message.userEmail,
                        message.orderId
                    )
                }
                "ORDER_CANCELLED" -> {
                    emailService.sendOrderCancelledEmail(
                        message.userEmail,
                        message.orderId
                    )
                }
            }
            
        } catch (e: Exception) {
            println("邮件发送失败: ${e.message}") 
            // 邮件发送失败通常不需要重试整个流程
        }
    }
    
    private fun updateOrderStatus(orderId: String, status: OrderStatus) {
        val order = orderRepository.findById(orderId)
        order?.let {
            it.status = status
            orderRepository.save(it)
        }
    }
}

JMS 的核心优势

1. 异步处理 ⚡

  • 问题:同步处理导致用户等待时间长
  • 解决:用户提交订单后立即返回,后续处理异步进行

2. 系统解耦 🔗

  • 问题:服务间紧耦合,一个服务故障影响整个流程
  • 解决:通过消息队列解耦,服务独立部署和扩展

3. 可靠性保证 🛡️

  • 问题:网络故障或服务不可用导致消息丢失
  • 解决:消息持久化,确保消息不丢失

4. 负载均衡 ⚖️

  • 问题:单个服务处理能力有限
  • 解决:多个消费者实例处理同一队列的消息

最佳实践建议

消息设计原则

  1. 消息要小而精:避免在消息中传递大量数据
  2. 消息要自包含:消息应包含处理所需的所有信息
  3. 消息要幂等:重复处理同一消息不应产生副作用

常见陷阱

  1. 忘记事务管理:可能导致数据不一致
  2. 无限重试:失败消息可能无限重试,消耗资源
  3. 消息顺序依赖:JMS 不保证消息顺序,需要特殊处理

生产环境考虑

  1. 监控和告警:监控队列深度、消费速率等指标
  2. 死信队列:处理无法正常消费的消息
  3. 消息持久化:确保重要消息不会因为 broker 重启而丢失

通过 Spring Boot 的 JMS 支持,我们可以轻松构建高可用、可扩展的异步消息处理系统,大大提升应用的性能和用户体验! 🎉