Appearance
Spring JMS 消息接收详解 📨
概述
在现代企业级应用中,消息队列是实现系统解耦、异步处理和提高系统可靠性的重要技术。Spring Framework 为 JMS(Java Message Service)提供了强大的支持,让我们能够轻松地接收和处理消息。
NOTE
JMS 是 Java 平台上的消息中间件标准,它定义了应用程序如何创建、发送、接收和读取消息。Spring JMS 在此基础上提供了更简洁、更强大的抽象。
为什么需要消息接收机制? 🤔
想象一下这样的场景:
- 电商系统:用户下单后,需要通知库存系统、支付系统、物流系统等多个子系统
- 日志处理:应用产生大量日志,需要异步处理避免影响主业务
- 数据同步:多个系统间需要实时同步数据变更
如果没有消息队列,这些场景会面临:
- 系统间强耦合,一个系统故障影响整体
- 同步调用导致响应时间长
- 难以处理突发流量峰值
消息接收的两种方式
Spring JMS 提供了两种主要的消息接收方式:
1. 同步接收(Synchronous Receipt)
同步接收是最简单直接的方式,调用线程会阻塞等待消息到达。
kotlin
@Service
class OrderSyncReceiver(
private val jmsTemplate: JmsTemplate
) {
fun receiveOrderMessage(): String? {
return try {
// 同步接收消息,会阻塞当前线程
val message = jmsTemplate.receive("order.queue") as? TextMessage
message?.text
} catch (ex: JMSException) {
logger.error("接收消息失败", ex)
null
}
}
fun receiveWithTimeout(): String? {
return try {
// 设置超时时间,避免无限等待
jmsTemplate.receiveTimeout = 5000 // 5秒超时
val message = jmsTemplate.receive("order.queue") as? TextMessage
message?.text
} catch (ex: Exception) {
logger.warn("接收消息超时或失败", ex)
null
}
}
companion object {
private val logger = LoggerFactory.getLogger(OrderSyncReceiver::class.java)
}
}
kotlin
@RestController
@RequestMapping("/api/orders")
class OrderController(
private val orderSyncReceiver: OrderSyncReceiver
) {
@GetMapping("/receive")
fun receiveOrder(): ResponseEntity<String> {
val message = orderSyncReceiver.receiveWithTimeout()
return if (message != null) {
ResponseEntity.ok("收到订单消息: $message")
} else {
ResponseEntity.noContent().build()
}
}
}
WARNING
同步接收会阻塞调用线程,在高并发场景下可能导致线程池耗尽。建议设置合理的超时时间,并谨慎使用。
2. 异步接收(Asynchronous Receipt)
异步接收是推荐的方式,它不会阻塞调用线程,而是通过监听器模式处理消息。
异步接收的实现方式
方式一:实现 MessageListener 接口
这是最基础的异步接收方式:
kotlin
@Component
class OrderMessageListener : MessageListener {
override fun onMessage(message: Message) {
when (message) {
is TextMessage -> {
try {
val orderData = message.text
processOrder(orderData)
logger.info("成功处理订单消息: $orderData")
} catch (ex: JMSException) {
logger.error("处理订单消息失败", ex)
throw RuntimeException("消息处理异常", ex)
}
}
else -> {
logger.warn("收到不支持的消息类型: ${message.javaClass.simpleName}")
throw IllegalArgumentException("消息必须是 TextMessage 类型")
}
}
}
private fun processOrder(orderData: String) {
// 处理订单逻辑
val order = parseOrderFromJson(orderData)
// 更新库存、发送通知等
logger.info("处理订单: ${order.id}")
}
private fun parseOrderFromJson(json: String): Order {
// JSON 解析逻辑
return Order(id = "ORDER-001", amount = 100.0)
}
companion object {
private val logger = LoggerFactory.getLogger(OrderMessageListener::class.java)
}
}
data class Order(val id: String, val amount: Double)
配置消息监听容器
kotlin
@Configuration
@EnableJms
class JmsConfig {
@Bean
fun orderMessageListener() = OrderMessageListener()
@Bean
fun jmsListenerContainer(
connectionFactory: ConnectionFactory,
orderMessageListener: OrderMessageListener
): DefaultMessageListenerContainer {
return DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestinationName("order.queue")
setMessageListener(orderMessageListener)
setConcurrentConsumers(2) // 并发消费者数量
setMaxConcurrentConsumers(5) // 最大并发消费者数量
}
}
}
方式二:使用 SessionAwareMessageListener
当需要在消息处理过程中发送响应消息时,可以使用 SessionAwareMessageListener
:
kotlin
@Component
class OrderSessionAwareListener : SessionAwareMessageListener<Message> {
override fun onMessage(message: Message, session: Session) {
when (message) {
is TextMessage -> {
try {
val orderData = message.text
val result = processOrder(orderData)
// 发送处理结果到响应队列
sendResponse(session, message, result)
} catch (ex: Exception) {
logger.error("处理订单失败", ex)
sendErrorResponse(session, message, ex.message ?: "未知错误")
}
}
}
}
private fun sendResponse(session: Session, originalMessage: Message, result: String) {
val replyTo = originalMessage.jmsReplyTo
if (replyTo != null) {
val responseMessage = session.createTextMessage(result)
val producer = session.createProducer(replyTo)
producer.send(responseMessage)
producer.close()
logger.info("发送响应消息: $result")
}
}
private fun sendErrorResponse(session: Session, originalMessage: Message, error: String) {
val replyTo = originalMessage.jmsReplyTo
if (replyTo != null) {
val errorMessage = session.createTextMessage("ERROR: $error")
val producer = session.createProducer(replyTo)
producer.send(errorMessage)
producer.close()
}
}
private fun processOrder(orderData: String): String {
// 处理订单逻辑
return "订单处理成功: $orderData"
}
companion object {
private val logger = LoggerFactory.getLogger(OrderSessionAwareListener::class.java)
}
}
方式三:使用 MessageListenerAdapter(推荐)
MessageListenerAdapter
是 Spring 提供的最灵活的方式,它允许普通的 POJO 作为消息监听器:
kotlin
// 普通的业务服务类,无需实现任何 JMS 接口
@Service
class OrderService {
// 处理不同类型的消息
fun handleMessage(message: String) {
logger.info("处理文本消息: $message")
processTextOrder(message)
}
fun handleMessage(message: Map<String, Any>) {
logger.info("处理Map消息: $message")
processMapOrder(message)
}
fun handleMessage(message: ByteArray) {
logger.info("处理字节数组消息,长度: ${message.size}")
processBinaryOrder(message)
}
// 带返回值的处理方法,自动发送响应
fun processOrderWithResponse(orderData: String): String {
return try {
val order = parseOrder(orderData)
// 业务处理逻辑
updateInventory(order)
sendNotification(order)
"订单 ${order.id} 处理成功"
} catch (ex: Exception) {
logger.error("订单处理失败", ex)
"订单处理失败: ${ex.message}"
}
}
private fun processTextOrder(message: String) {
// 处理文本订单
}
private fun processMapOrder(message: Map<String, Any>) {
// 处理Map格式订单
}
private fun processBinaryOrder(message: ByteArray) {
// 处理二进制订单数据
}
private fun parseOrder(data: String): Order {
return Order(id = "ORDER-${System.currentTimeMillis()}", amount = 100.0)
}
private fun updateInventory(order: Order) {
logger.info("更新库存: ${order.id}")
}
private fun sendNotification(order: Order) {
logger.info("发送通知: ${order.id}")
}
companion object {
private val logger = LoggerFactory.getLogger(OrderService::class.java)
}
}
配置 MessageListenerAdapter:
kotlin
@Configuration
class MessageAdapterConfig {
@Bean
fun orderMessageAdapter(orderService: OrderService): MessageListenerAdapter {
return MessageListenerAdapter(orderService).apply {
// 设置默认的处理方法名
setDefaultListenerMethod("handleMessage")
// 可以设置自定义的消息转换器
setMessageConverter(SimpleMessageConverter())
}
}
@Bean
fun orderResponseAdapter(orderService: OrderService): MessageListenerAdapter {
return MessageListenerAdapter(orderService).apply {
// 指定带响应的处理方法
setDefaultListenerMethod("processOrderWithResponse")
// 设置默认响应目标
setDefaultResponseDestination(ActiveMQQueue("order.response.queue"))
}
}
@Bean
fun orderListenerContainer(
connectionFactory: ConnectionFactory,
orderMessageAdapter: MessageListenerAdapter
): DefaultMessageListenerContainer {
return DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestinationName("order.queue")
setMessageListener(orderMessageAdapter)
}
}
}
使用注解简化配置
Spring 还提供了 @JmsListener
注解,这是最简洁的方式:
kotlin
@Component
class AnnotationOrderListener {
@JmsListener(destination = "order.queue")
fun handleOrder(orderData: String) {
logger.info("收到订单: $orderData")
processOrder(orderData)
}
@JmsListener(destination = "order.priority.queue", concurrency = "2-5")
fun handlePriorityOrder(
@Payload orderData: String,
@Header("priority") priority: String
) {
logger.info("收到优先级订单: $orderData, 优先级: $priority")
processPriorityOrder(orderData, priority)
}
// 带响应的监听器
@JmsListener(destination = "order.request.queue")
@SendTo("order.response.queue")
fun processOrderWithResponse(orderData: String): String {
return try {
processOrder(orderData)
"订单处理成功: $orderData"
} catch (ex: Exception) {
"订单处理失败: ${ex.message}"
}
}
private fun processOrder(orderData: String) {
// 订单处理逻辑
Thread.sleep(100) // 模拟处理时间
}
private fun processPriorityOrder(orderData: String, priority: String) {
// 优先级订单处理逻辑
when (priority) {
"HIGH" -> processHighPriorityOrder(orderData)
"NORMAL" -> processOrder(orderData)
else -> logger.warn("未知优先级: $priority")
}
}
private fun processHighPriorityOrder(orderData: String) {
// 高优先级处理逻辑
logger.info("高优先级处理: $orderData")
}
companion object {
private val logger = LoggerFactory.getLogger(AnnotationOrderListener::class.java)
}
}
启用注解支持:
kotlin
@Configuration
@EnableJms
class JmsAnnotationConfig {
@Bean
fun jmsListenerContainerFactory(
connectionFactory: ConnectionFactory
): DefaultJmsListenerContainerFactory {
return DefaultJmsListenerContainerFactory().apply {
setConnectionFactory(connectionFactory)
setConcurrency("1-3") // 默认并发设置
setSessionTransacted(true) // 启用事务
}
}
}
事务处理
在企业级应用中,消息处理往往需要与数据库操作保持事务一致性:
本地事务
kotlin
@Configuration
class TransactionalJmsConfig {
@Bean
fun transactionalListenerContainer(
connectionFactory: ConnectionFactory,
orderMessageListener: OrderMessageListener
): DefaultMessageListenerContainer {
return DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestinationName("order.queue")
setMessageListener(orderMessageListener)
isSessionTransacted = true
}
}
}
@Service
@Transactional
class TransactionalOrderService {
@Autowired
private lateinit var orderRepository: OrderRepository
@JmsListener(destination = "order.queue")
@Transactional
fun handleOrder(orderData: String) {
try {
val order = parseOrder(orderData)
// 数据库操作
orderRepository.save(order)
// 如果这里抛出异常,消息会回滚
if (order.amount > 10000) {
throw BusinessException("订单金额过大")
}
logger.info("订单保存成功: ${order.id}")
} catch (ex: Exception) {
logger.error("订单处理失败,事务将回滚", ex)
throw ex // 重新抛出异常,触发事务回滚
}
}
private fun parseOrder(data: String): Order {
return Order(id = "ORDER-${System.currentTimeMillis()}", amount = 100.0)
}
}
class BusinessException(message: String) : RuntimeException(message)
分布式事务(XA)
分布式事务配置示例
kotlin
@Configuration
class XATransactionConfig {
@Bean
fun jtaTransactionManager(): JtaTransactionManager {
return JtaTransactionManager()
}
@Bean
fun xaListenerContainer(
connectionFactory: ConnectionFactory,
transactionManager: JtaTransactionManager,
orderMessageListener: OrderMessageListener
): DefaultMessageListenerContainer {
return DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestinationName("order.queue")
setMessageListener(orderMessageListener)
setTransactionManager(transactionManager)
}
}
}
@Service
class XAOrderService {
@Autowired
private lateinit var orderRepository: OrderRepository
@Autowired
private lateinit var inventoryService: InventoryService
@JmsListener(destination = "order.queue")
@Transactional // 使用 JTA 事务管理器
fun handleOrderWithXA(orderData: String) {
val order = parseOrder(orderData)
// 数据库操作(参与 XA 事务)
orderRepository.save(order)
// 调用其他服务(参与 XA 事务)
inventoryService.updateInventory(order.productId, order.quantity)
// 如果任何操作失败,整个分布式事务都会回滚
logger.info("XA事务处理完成: ${order.id}")
}
private fun parseOrder(data: String): Order {
return Order(
id = "ORDER-${System.currentTimeMillis()}",
amount = 100.0,
productId = "PROD-001",
quantity = 2
)
}
}
data class Order(
val id: String,
val amount: Double,
val productId: String = "",
val quantity: Int = 0
)
错误处理和重试机制
在实际应用中,消息处理可能会失败,需要合适的错误处理策略:
kotlin
@Component
class RobustOrderListener {
@JmsListener(destination = "order.queue")
fun handleOrderWithRetry(
orderData: String,
@Header(JmsHeaders.REDELIVERED, required = false) redelivered: Boolean = false
) {
try {
if (redelivered) {
logger.warn("处理重发消息: $orderData")
}
processOrder(orderData)
} catch (ex: TemporaryException) {
// 临时异常,可以重试
logger.warn("临时异常,消息将重新投递", ex)
throw ex // 重新抛出,触发重试
} catch (ex: PermanentException) {
// 永久异常,不应重试
logger.error("永久异常,消息将被丢弃", ex)
sendToDeadLetterQueue(orderData, ex)
// 不重新抛出异常,消息被确认
} catch (ex: Exception) {
// 未知异常,谨慎处理
logger.error("未知异常", ex)
if (shouldRetry(orderData)) {
throw ex
} else {
sendToDeadLetterQueue(orderData, ex)
}
}
}
private fun processOrder(orderData: String) {
// 模拟可能失败的业务逻辑
if (orderData.contains("FAIL")) {
throw TemporaryException("模拟临时失败")
}
if (orderData.contains("INVALID")) {
throw PermanentException("无效的订单数据")
}
logger.info("订单处理成功: $orderData")
}
private fun shouldRetry(orderData: String): Boolean {
// 实现重试策略逻辑
return !orderData.contains("NO_RETRY")
}
private fun sendToDeadLetterQueue(orderData: String, exception: Exception) {
// 发送到死信队列
logger.info("发送到死信队列: $orderData")
}
companion object {
private val logger = LoggerFactory.getLogger(RobustOrderListener::class.java)
}
}
class TemporaryException(message: String) : RuntimeException(message)
class PermanentException(message: String) : RuntimeException(message)
性能优化建议
1. 合理设置并发消费者
kotlin
@Bean
fun optimizedListenerContainer(
connectionFactory: ConnectionFactory
): DefaultMessageListenerContainer {
return DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestinationName("order.queue")
// 并发设置
setConcurrentConsumers(2) // 初始消费者数量
setMaxConcurrentConsumers(10) // 最大消费者数量
// 性能调优
setReceiveTimeout(1000) // 接收超时
setIdleConsumerLimit(5) // 空闲消费者限制
setIdleTaskExecutionLimit(10) // 空闲任务执行限制
}
}
2. 消息预取和批处理
kotlin
@Configuration
class PerformanceConfig {
@Bean
fun highThroughputContainer(
connectionFactory: ConnectionFactory
): DefaultMessageListenerContainer {
return DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestinationName("high.volume.queue")
// 批处理设置
setConcurrentConsumers(5)
setMaxConcurrentConsumers(20)
// 预取设置(如果使用 ActiveMQ)
if (connectionFactory is ActiveMQConnectionFactory) {
connectionFactory.prefetchPolicy.apply {
queuePrefetch = 100 // 队列预取数量
}
}
}
}
}
监控和诊断
添加监控指标以便运维:
kotlin
@Component
class MonitoredOrderListener {
private val processedCounter = Counter.build()
.name("jms_messages_processed_total")
.help("Total processed messages")
.labelNames("queue", "status")
.register()
private val processingTimer = Timer.build()
.name("jms_message_processing_duration_seconds")
.help("Message processing duration")
.labelNames("queue")
.register()
@JmsListener(destination = "order.queue")
fun handleOrderWithMetrics(orderData: String) {
val timer = processingTimer.labels("order.queue").startTimer()
try {
processOrder(orderData)
processedCounter.labels("order.queue", "success").inc()
} catch (ex: Exception) {
processedCounter.labels("order.queue", "error").inc()
logger.error("消息处理失败", ex)
throw ex
} finally {
timer.observeDuration()
}
}
private fun processOrder(orderData: String) {
// 业务逻辑
Thread.sleep(50) // 模拟处理时间
logger.info("处理订单: $orderData")
}
companion object {
private val logger = LoggerFactory.getLogger(MonitoredOrderListener::class.java)
}
}
总结
Spring JMS 提供了多种灵活的消息接收方式:
选择建议
- 简单场景:使用
@JmsListener
注解,简洁高效 - 需要访问 Session:使用
SessionAwareMessageListener
- 复杂业务逻辑:使用
MessageListenerAdapter
配合 POJO - 高性能场景:合理配置并发消费者和预取参数
- 事务场景:根据需要选择本地事务或分布式事务
IMPORTANT
在生产环境中,务必考虑错误处理、监控告警、性能调优等方面,确保消息处理的可靠性和高效性。
通过合理使用这些特性,我们可以构建出高可用、高性能的消息驱动应用程序! 🚀