Appearance
Spring Boot JMS 消息队列详解 🚀
什么是 JMS?为什么需要它?
在现代分布式系统中,不同的服务组件需要相互通信。想象一下,你正在开发一个电商系统:
- 用户下单后,需要通知库存系统减少商品数量
- 需要通知支付系统处理付款
- 需要通知物流系统准备发货
- 需要发送确认邮件给用户
如果所有这些操作都同步进行,用户可能需要等待很长时间才能看到下单成功的页面。更糟糕的是,如果其中任何一个服务出现问题,整个下单流程都会失败。
NOTE
JMS (Java Message Service) 是 Java 平台上的消息中间件标准,它提供了一种异步通信机制,让应用程序可以通过消息队列进行解耦通信。
Spring Boot 中的 JMS 自动配置
Spring Boot 为 JMS 提供了开箱即用的自动配置,主要支持两种消息代理:
1. ActiveMQ "Classic" 支持
kotlin
// build.gradle.kts
dependencies {
implementation("org.springframework.boot:spring-boot-starter-activemq")
// 如果需要嵌入式 broker
implementation("org.apache.activemq:activemq-broker")
}
yaml
spring:
activemq:
embedded:
enabled: true # 启用嵌入式 broker(默认)
jms:
cache:
session-cache-size: 5 # 会话缓存大小
yaml
spring:
activemq:
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
pool:
enabled: true
max-connections: 50
2. ActiveMQ Artemis 支持
kotlin
// build.gradle.kts
dependencies {
implementation("org.springframework.boot:spring-boot-starter-artemis")
// 如果需要嵌入式模式
implementation("org.apache.activemq:artemis-jakarta-server")
}
yaml
spring:
artemis:
mode: native # embedded 或 native
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
pool:
enabled: true
max-connections: 50
TIP
嵌入式 vs 外部 Broker 的选择
- 嵌入式:适合开发和测试环境,简单快速启动
- 外部 Broker:适合生产环境,提供更好的性能和可靠性
发送消息:JmsTemplate 的使用
Spring Boot 自动配置了 JmsTemplate
,这是发送消息的核心工具:
kotlin
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Service
@Service
class OrderService(
private val jmsTemplate: JmsTemplate
) {
/**
* 处理订单并发送异步消息
*/
fun processOrder(order: Order) {
// 1. 保存订单到数据库
saveOrder(order)
// 2. 发送消息到不同的队列进行异步处理
sendInventoryMessage(order)
sendPaymentMessage(order)
sendEmailMessage(order)
}
private fun sendInventoryMessage(order: Order) {
val inventoryMessage = InventoryMessage(
orderId = order.id,
productId = order.productId,
quantity = order.quantity
)
// 发送到库存队列
jmsTemplate.convertAndSend("inventory.queue", inventoryMessage)
}
private fun sendPaymentMessage(order: Order) {
val paymentMessage = PaymentMessage(
orderId = order.id,
amount = order.totalAmount,
userId = order.userId
)
// 发送到支付队列
jmsTemplate.convertAndSend("payment.queue", paymentMessage)
}
private fun sendEmailMessage(order: Order) {
val emailMessage = EmailMessage(
to = order.userEmail,
subject = "订单确认",
orderId = order.id
)
// 发送到邮件队列
jmsTemplate.convertAndSend("email.queue", emailMessage)
}
private fun saveOrder(order: Order) {
// 保存订单逻辑
println("订单 ${order.id} 已保存")
}
}
// 消息数据类
data class Order(
val id: String,
val productId: String,
val quantity: Int,
val totalAmount: Double,
val userId: String,
val userEmail: String
)
data class InventoryMessage(
val orderId: String,
val productId: String,
val quantity: Int
)
data class PaymentMessage(
val orderId: String,
val amount: Double,
val userId: String
)
data class EmailMessage(
val to: String,
val subject: String,
val orderId: String
)
IMPORTANT
convertAndSend
方法会自动将 Kotlin 对象序列化为 JSON 格式发送,并在接收端自动反序列化。
接收消息:@JmsListener 注解
使用 @JmsListener
注解可以轻松创建消息监听器:
kotlin
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional
@Component
class MessageHandlers {
/**
* 处理库存消息
*/
@JmsListener(destination = "inventory.queue")
@Transactional // 确保事务一致性
fun handleInventoryMessage(message: InventoryMessage) {
try {
println("处理库存消息: $message")
// 模拟库存处理逻辑
updateInventory(message.productId, message.quantity)
println("库存更新成功 - 订单: ${message.orderId}")
} catch (e: Exception) {
println("库存处理失败: ${e.message}")
throw e // 重新抛出异常,触发消息重试
}
}
/**
* 处理支付消息
*/
@JmsListener(destination = "payment.queue")
@Transactional
fun handlePaymentMessage(message: PaymentMessage) {
try {
println("处理支付消息: $message")
// 模拟支付处理逻辑
processPayment(message.userId, message.amount)
println("支付处理成功 - 订单: ${message.orderId}")
} catch (e: Exception) {
println("支付处理失败: ${e.message}")
throw e
}
}
/**
* 处理邮件消息
*/
@JmsListener(destination = "email.queue")
fun handleEmailMessage(message: EmailMessage) {
try {
println("发送邮件: $message")
// 模拟邮件发送逻辑
sendEmail(message.to, message.subject, "您的订单 ${message.orderId} 已确认")
println("邮件发送成功")
} catch (e: Exception) {
println("邮件发送失败: ${e.message}")
// 邮件发送失败通常不需要重试,记录日志即可
}
}
private fun updateInventory(productId: String, quantity: Int) {
// 实际的库存更新逻辑
Thread.sleep(100) // 模拟数据库操作
}
private fun processPayment(userId: String, amount: Double) {
// 实际的支付处理逻辑
Thread.sleep(200) // 模拟支付接口调用
}
private fun sendEmail(to: String, subject: String, content: String) {
// 实际的邮件发送逻辑
Thread.sleep(50) // 模拟邮件发送
}
}
自定义消息监听器工厂
当需要更精细的控制时,可以自定义 JmsListenerContainerFactory
:
自定义监听器工厂配置
kotlin
import jakarta.jms.ConnectionFactory
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer
import org.springframework.boot.jms.ConnectionFactoryUnwrapper
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.config.DefaultJmsListenerContainerFactory
import org.springframework.jms.support.converter.MappingJackson2MessageConverter
import org.springframework.jms.support.converter.MessageConverter
import org.springframework.jms.support.converter.MessageType
@Configuration
class JmsConfig {
/**
* 自定义消息转换器 - 使用 JSON 格式
*/
@Bean
fun messageConverter(): MessageConverter {
val converter = MappingJackson2MessageConverter()
converter.setTargetType(MessageType.TEXT)
converter.setTypeIdPropertyName("_type")
return converter
}
/**
* 自定义监听器容器工厂 - 用于高优先级消息
*/
@Bean
fun highPriorityFactory(
configurer: DefaultJmsListenerContainerFactoryConfigurer,
connectionFactory: ConnectionFactory,
messageConverter: MessageConverter
): DefaultJmsListenerContainerFactory {
val factory = DefaultJmsListenerContainerFactory()
// 使用配置器应用默认设置
configurer.configure(factory, ConnectionFactoryUnwrapper.unwrapCaching(connectionFactory))
// 自定义设置
factory.setMessageConverter(messageConverter)
factory.setConcurrency("5-10") // 并发消费者数量
factory.setReceiveTimeout(5000) // 接收超时时间
return factory
}
/**
* 自定义监听器容器工厂 - 用于低优先级消息
*/
@Bean
fun lowPriorityFactory(
configurer: DefaultJmsListenerContainerFactoryConfigurer,
connectionFactory: ConnectionFactory,
messageConverter: MessageConverter
): DefaultJmsListenerContainerFactory {
val factory = DefaultJmsListenerContainerFactory()
configurer.configure(factory, ConnectionFactoryUnwrapper.unwrapCaching(connectionFactory))
factory.setMessageConverter(messageConverter)
factory.setConcurrency("1-3") // 较少的并发消费者
factory.setReceiveTimeout(10000)
return factory
}
}
使用自定义工厂:
kotlin
@Component
class PriorityMessageHandlers {
/**
* 高优先级消息处理 - 使用专门的工厂
*/
@JmsListener(
destination = "high-priority.queue",
containerFactory = "highPriorityFactory"
)
fun handleHighPriorityMessage(message: String) {
println("处理高优先级消息: $message")
// 重要业务逻辑处理
}
/**
* 低优先级消息处理
*/
@JmsListener(
destination = "low-priority.queue",
containerFactory = "lowPriorityFactory"
)
fun handleLowPriorityMessage(message: String) {
println("处理低优先级消息: $message")
// 非关键业务逻辑处理
}
}
实际业务场景示例
让我们看一个完整的电商订单处理示例:
kotlin
import org.springframework.web.bind.annotation.*
@RestController
@RequestMapping("/api/orders")
class OrderController(
private val orderService: OrderService
) {
@PostMapping
fun createOrder(@RequestBody orderRequest: CreateOrderRequest): OrderResponse {
val order = orderService.createOrder(orderRequest)
return OrderResponse(
orderId = order.id,
status = "PROCESSING",
message = "订单已提交,正在处理中..."
)
}
}
data class CreateOrderRequest(
val productId: String,
val quantity: Int,
val userId: String,
val userEmail: String
)
data class OrderResponse(
val orderId: String,
val status: String,
val message: String
)
kotlin
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.util.*
@Service
@Transactional
class OrderService(
private val jmsTemplate: JmsTemplate,
private val orderRepository: OrderRepository
) {
fun createOrder(request: CreateOrderRequest): Order {
// 1. 创建订单对象
val order = Order(
id = UUID.randomUUID().toString(),
productId = request.productId,
quantity = request.quantity,
totalAmount = calculateAmount(request.productId, request.quantity),
userId = request.userId,
userEmail = request.userEmail,
status = OrderStatus.CREATED
)
// 2. 保存订单到数据库
orderRepository.save(order)
// 3. 发送异步消息进行后续处理
sendOrderProcessingMessages(order)
return order
}
private fun sendOrderProcessingMessages(order: Order) {
// 发送库存检查消息
jmsTemplate.convertAndSend("inventory.check", InventoryCheckMessage(
orderId = order.id,
productId = order.productId,
quantity = order.quantity
))
// 发送支付处理消息
jmsTemplate.convertAndSend("payment.process", PaymentProcessMessage(
orderId = order.id,
amount = order.totalAmount,
userId = order.userId
))
// 发送邮件通知消息
jmsTemplate.convertAndSend("email.notification", EmailNotificationMessage(
orderId = order.id,
userEmail = order.userEmail,
type = "ORDER_CREATED"
))
}
private fun calculateAmount(productId: String, quantity: Int): Double {
// 模拟价格计算
return quantity * 99.99
}
}
kotlin
@Component
class OrderMessageHandlers(
private val orderRepository: OrderRepository,
private val inventoryService: InventoryService,
private val paymentService: PaymentService,
private val emailService: EmailService
) {
@JmsListener(destination = "inventory.check")
@Transactional
fun handleInventoryCheck(message: InventoryCheckMessage) {
try {
val available = inventoryService.checkAvailability(
message.productId,
message.quantity
)
if (available) {
// 库存充足,扣减库存
inventoryService.reserve(message.productId, message.quantity)
updateOrderStatus(message.orderId, OrderStatus.INVENTORY_RESERVED)
// 发送库存预留成功消息
jmsTemplate.convertAndSend("inventory.reserved", InventoryReservedMessage(
orderId = message.orderId,
productId = message.productId,
quantity = message.quantity
))
} else {
// 库存不足,取消订单
updateOrderStatus(message.orderId, OrderStatus.CANCELLED)
// 发送库存不足通知
jmsTemplate.convertAndSend("order.cancelled", OrderCancelledMessage(
orderId = message.orderId,
reason = "库存不足"
))
}
} catch (e: Exception) {
println("库存检查失败: ${e.message}")
throw e // 触发消息重试
}
}
@JmsListener(destination = "payment.process")
@Transactional
fun handlePaymentProcess(message: PaymentProcessMessage) {
try {
val paymentResult = paymentService.processPayment(
message.userId,
message.amount
)
if (paymentResult.success) {
updateOrderStatus(message.orderId, OrderStatus.PAID)
// 发送支付成功消息
jmsTemplate.convertAndSend("payment.completed", PaymentCompletedMessage(
orderId = message.orderId,
transactionId = paymentResult.transactionId
))
} else {
updateOrderStatus(message.orderId, OrderStatus.PAYMENT_FAILED)
// 发送支付失败消息
jmsTemplate.convertAndSend("payment.failed", PaymentFailedMessage(
orderId = message.orderId,
reason = paymentResult.errorMessage
))
}
} catch (e: Exception) {
println("支付处理失败: ${e.message}")
throw e
}
}
@JmsListener(destination = "email.notification")
fun handleEmailNotification(message: EmailNotificationMessage) {
try {
when (message.type) {
"ORDER_CREATED" -> {
emailService.sendOrderCreatedEmail(
message.userEmail,
message.orderId
)
}
"ORDER_COMPLETED" -> {
emailService.sendOrderCompletedEmail(
message.userEmail,
message.orderId
)
}
"ORDER_CANCELLED" -> {
emailService.sendOrderCancelledEmail(
message.userEmail,
message.orderId
)
}
}
} catch (e: Exception) {
println("邮件发送失败: ${e.message}")
// 邮件发送失败通常不需要重试整个流程
}
}
private fun updateOrderStatus(orderId: String, status: OrderStatus) {
val order = orderRepository.findById(orderId)
order?.let {
it.status = status
orderRepository.save(it)
}
}
}
JMS 的核心优势
1. 异步处理 ⚡
- 问题:同步处理导致用户等待时间长
- 解决:用户提交订单后立即返回,后续处理异步进行
2. 系统解耦 🔗
- 问题:服务间紧耦合,一个服务故障影响整个流程
- 解决:通过消息队列解耦,服务独立部署和扩展
3. 可靠性保证 🛡️
- 问题:网络故障或服务不可用导致消息丢失
- 解决:消息持久化,确保消息不丢失
4. 负载均衡 ⚖️
- 问题:单个服务处理能力有限
- 解决:多个消费者实例处理同一队列的消息
最佳实践建议
消息设计原则
- 消息要小而精:避免在消息中传递大量数据
- 消息要自包含:消息应包含处理所需的所有信息
- 消息要幂等:重复处理同一消息不应产生副作用
常见陷阱
- 忘记事务管理:可能导致数据不一致
- 无限重试:失败消息可能无限重试,消耗资源
- 消息顺序依赖:JMS 不保证消息顺序,需要特殊处理
生产环境考虑
- 监控和告警:监控队列深度、消费速率等指标
- 死信队列:处理无法正常消费的消息
- 消息持久化:确保重要消息不会因为 broker 重启而丢失
通过 Spring Boot 的 JMS 支持,我们可以轻松构建高可用、可扩展的异步消息处理系统,大大提升应用的性能和用户体验! 🎉