Skip to content

Spring Boot Apache Pulsar 支持详解 🚀

概述

Apache Pulsar 是一个分布式的发布-订阅消息系统,具有高性能、可扩展性和持久性的特点。Spring Boot 通过自动配置 Spring for Apache Pulsar 项目,为开发者提供了简单易用的 Pulsar 集成方案。

NOTE

Spring Boot 会在类路径中存在 org.springframework.pulsar:spring-pulsar 时自动配置传统(命令式)组件,存在 org.springframework.pulsar:spring-pulsar-reactive 时自动配置响应式组件。

为什么需要 Apache Pulsar? 🤔

在现代分布式系统中,服务间通信是一个核心挑战:

  • 异步处理:避免同步调用造成的性能瓶颈
  • 系统解耦:减少服务间的直接依赖
  • 流量削峰:处理突发的高并发请求
  • 数据一致性:确保分布式事务的可靠性

快速开始

依赖配置

kotlin
dependencies {
    // 传统方式
    implementation("org.springframework.boot:spring-boot-starter-pulsar")
    
    // 响应式方式
    implementation("org.springframework.boot:spring-boot-starter-pulsar-reactive")
}
xml
<dependencies>
    <!-- 传统方式 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-pulsar</artifactId>
    </dependency>
    
    <!-- 响应式方式 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-pulsar-reactive</artifactId>
    </dependency>
</dependencies>

连接到 Pulsar

基础连接配置

Spring Boot 会自动配置 PulsarClient Bean,默认连接到本地 Pulsar 实例。

yaml
spring:
  pulsar:
    client:
      service-url: pulsar://localhost:6650  # Pulsar 服务地址
properties
spring.pulsar.client.service-url=pulsar://localhost:6650

IMPORTANT

service-url 必须是有效的 Pulsar 协议 URL,格式为 pulsar://host:port

认证配置

对于需要认证的 Pulsar 集群,可以配置认证插件:

yaml
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance
yaml
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
        param:
          token: eyJhbGciOiJIUzI1NiJ9...

WARNING

认证参数名称必须与插件期望的完全匹配(通常是驼峰命名),Spring Boot 不会进行宽松绑定。

自定义客户端配置

kotlin
@Configuration
class PulsarConfig {
    
    @Bean
    fun pulsarClientCustomizer(): PulsarClientBuilderCustomizer {
        return PulsarClientBuilderCustomizer { builder ->
            builder
                .connectionTimeout(10, TimeUnit.SECONDS) 
                .operationTimeout(15, TimeUnit.SECONDS)  
                .ioThreads(4)                           
        }
    }
}

发送消息

传统方式发送消息

kotlin
@Service
class OrderService(
    private val pulsarTemplate: PulsarTemplate<String>
) {
    
    fun processOrder(order: Order) {
        try {
            // 发送订单创建事件
            pulsarTemplate.send("order-events", order.toJson()) 
            
            // 发送带有自定义属性的消息
            pulsarTemplate.send("order-events", order.toJson()) { messageBuilder ->
                messageBuilder
                    .property("orderId", order.id.toString())     
                    .property("customerId", order.customerId)     
                    .eventTime(order.createdAt.toEpochMilli())   
            }
            
            logger.info("订单事件已发送: ${order.id}")
        } catch (e: PulsarClientException) {
            logger.error("发送订单事件失败", e) 
            throw OrderProcessingException("消息发送失败", e)
        }
    }
}

响应式方式发送消息

kotlin
@Service
class ReactiveOrderService(
    private val reactivePulsarTemplate: ReactivePulsarTemplate<String>
) {
    
    fun processOrderAsync(order: Order): Mono<Void> {
        return reactivePulsarTemplate
            .send("order-events", order.toJson()) 
            .doOnSuccess { messageId ->
                logger.info("订单事件已发送,消息ID: $messageId")
            }
            .doOnError { error ->
                logger.error("发送订单事件失败", error) 
            }
            .then()
    }
}

生产者配置

yaml
spring:
  pulsar:
    producer:
      name: order-producer
      send-timeout: 30s
      batch-enabled: true
      batching-max-messages: 100
      cache:
        expire-after-access: 10m
        maximum-size: 100

接收消息

传统监听器

kotlin
@Component
class OrderEventListener {
    
    @PulsarListener(topics = ["order-events"])
    fun handleOrderEvent(
        @Payload orderJson: String,                    
        @Header("orderId") orderId: String?,           
        message: Message<String>                       
    ) {
        try {
            val order = Json.decodeFromString<Order>(orderJson)
            logger.info("处理订单事件: ${order.id}")
            
            // 业务逻辑处理
            processOrderEvent(order)
            
        } catch (e: Exception) {
            logger.error("处理订单事件失败: $orderJson", e) 
            throw e // 重新抛出异常以触发重试机制
        }
    }
    
    // 多主题监听
    @PulsarListener(
        topics = ["order-events", "payment-events"],
        subscriptionName = "business-processor"
    )
    fun handleBusinessEvents(event: String) {
        // 处理多种业务事件
    }
}

响应式监听器

kotlin
@Component
class ReactiveOrderEventListener {
    
    @ReactivePulsarListener(topics = ["order-events"])
    fun handleOrderEventReactive(orderJson: String): Mono<Void> {
        return Mono.fromCallable {
            Json.decodeFromString<Order>(orderJson)
        }
        .flatMap { order ->
            processOrderEventAsync(order) 
        }
        .doOnSuccess {
            logger.info("响应式处理订单事件完成")
        }
        .doOnError { error ->
            logger.error("响应式处理订单事件失败", error) 
        }
        .then()
    }
    
    private fun processOrderEventAsync(order: Order): Mono<Void> {
        // 异步业务处理逻辑
        return Mono.empty()
    }
}

消费者配置

yaml
spring:
  pulsar:
    consumer:
      subscription-name: order-service
      subscription-type: shared
      ack-timeout: 30s
      retry-enable: true
    listener:
      schema-type: string
      observation-enabled: true

消息读取器 (Reader)

Reader 提供了更灵活的消息消费方式,允许手动管理游标位置。

传统 Reader

kotlin
@Component
class OrderHistoryReader {
    
    @PulsarReader(
        topics = ["order-events"],
        startMessageId = "earliest"  // 从最早的消息开始读取
    )
    fun readOrderHistory(orderJson: String) {
        val order = Json.decodeFromString<Order>(orderJson)
        logger.info("读取历史订单: ${order.id}")
        
        // 处理历史数据
        processHistoricalOrder(order)
    }
}

响应式 Reader

kotlin
@Service
class ReactiveOrderHistoryService(
    private val readerFactory: ReactivePulsarReaderFactory<String>
) {
    
    fun readOrdersFromTime(startTime: Instant): Flux<Order> {
        val customizer = ReactiveMessageReaderBuilderCustomizer<String> { builder ->
            builder
                .topic("order-events")                              
                .startAtSpec(StartAtSpec.ofInstant(startTime))     
        }
        
        return readerFactory
            .createReader(Schema.STRING, listOf(customizer))
            .readMany()                                            
            .map { message ->
                Json.decodeFromString<Order>(message.value)
            }
            .doOnNext { order ->
                logger.info("读取到订单: ${order.id}")
            }
    }
}

事务支持

Pulsar 支持事务,确保消息的原子性操作。

启用事务

yaml
spring:
  pulsar:
    transaction:
      enabled: true  # 启用事务支持

事务使用示例

kotlin
@Service
@Transactional
class TransactionalOrderService(
    private val pulsarTemplate: PulsarTemplate<String>,
    private val orderRepository: OrderRepository
) {
    
    @PulsarListener(
        topics = ["payment-events"],
        transactional = "required"
    )
    fun handlePaymentEvent(paymentJson: String) {
        val payment = Json.decodeFromString<Payment>(paymentJson)
        
        // 数据库操作
        val order = orderRepository.findById(payment.orderId)
            ?: throw OrderNotFoundException("订单不存在: ${payment.orderId}")
        
        order.status = OrderStatus.PAID
        orderRepository.save(order)
        
        // 发送后续事件(在同一事务中)
        pulsarTemplate.send("order-status-events", order.toJson()) 
        
        logger.info("订单支付处理完成: ${order.id}")
    }
}

完整的业务场景示例

让我们通过一个电商订单处理的完整示例来展示 Pulsar 的实际应用:

完整的订单处理服务示例
kotlin
// 订单数据模型
@Serializable
data class Order(
    val id: String,
    val customerId: String,
    val items: List<OrderItem>,
    val totalAmount: BigDecimal,
    var status: OrderStatus,
    val createdAt: Instant
)

@Serializable
data class OrderItem(
    val productId: String,
    val quantity: Int,
    val price: BigDecimal
)

enum class OrderStatus {
    CREATED, PAID, SHIPPED, DELIVERED, CANCELLED
}

// 订单服务
@Service
class OrderService(
    private val pulsarTemplate: PulsarTemplate<String>,
    private val orderRepository: OrderRepository
) {
    private val logger = LoggerFactory.getLogger(OrderService::class.java)
    
    fun createOrder(orderRequest: CreateOrderRequest): Order {
        // 创建订单
        val order = Order(
            id = UUID.randomUUID().toString(),
            customerId = orderRequest.customerId,
            items = orderRequest.items,
            totalAmount = calculateTotal(orderRequest.items),
            status = OrderStatus.CREATED,
            createdAt = Instant.now()
        )
        
        // 保存到数据库
        orderRepository.save(order)
        
        // 发送订单创建事件
        pulsarTemplate.send("order-events", order.toJson()) { builder ->
            builder
                .property("eventType", "ORDER_CREATED")
                .property("orderId", order.id)
                .property("customerId", order.customerId)
        }
        
        logger.info("订单创建成功: ${order.id}")
        return order
    }
}

// 库存服务监听器
@Component
class InventoryEventListener(
    private val inventoryService: InventoryService,
    private val pulsarTemplate: PulsarTemplate<String>
) {
    private val logger = LoggerFactory.getLogger(InventoryEventListener::class.java)
    
    @PulsarListener(
        topics = ["order-events"],
        subscriptionName = "inventory-service"
    )
    fun handleOrderEvent(
        @Payload orderJson: String,
        @Header("eventType") eventType: String?
    ) {
        if (eventType == "ORDER_CREATED") {
            val order = Json.decodeFromString<Order>(orderJson)
            
            try {
                // 检查库存
                val inventoryResult = inventoryService.checkAndReserveInventory(order.items)
                
                if (inventoryResult.success) {
                    // 库存充足,发送库存预留成功事件
                    pulsarTemplate.send("inventory-events", inventoryResult.toJson()) { builder ->
                        builder
                            .property("eventType", "INVENTORY_RESERVED")
                            .property("orderId", order.id)
                    }
                    logger.info("库存预留成功: ${order.id}")
                } else {
                    // 库存不足,发送库存不足事件
                    pulsarTemplate.send("inventory-events", inventoryResult.toJson()) { builder ->
                        builder
                            .property("eventType", "INVENTORY_INSUFFICIENT")
                            .property("orderId", order.id)
                    }
                    logger.warn("库存不足: ${order.id}")
                }
            } catch (e: Exception) {
                logger.error("处理订单库存检查失败: ${order.id}", e)
                throw e
            }
        }
    }
}

// 支付服务监听器
@Component
class PaymentEventListener(
    private val paymentService: PaymentService,
    private val pulsarTemplate: PulsarTemplate<String>
) {
    
    @PulsarListener(
        topics = ["inventory-events"],
        subscriptionName = "payment-service"
    )
    fun handleInventoryEvent(
        @Payload eventJson: String,
        @Header("eventType") eventType: String?,
        @Header("orderId") orderId: String?
    ) {
        if (eventType == "INVENTORY_RESERVED" && orderId != null) {
            try {
                // 处理支付
                val paymentResult = paymentService.processPayment(orderId)
                
                // 发送支付结果事件
                pulsarTemplate.send("payment-events", paymentResult.toJson()) { builder ->
                    builder
                        .property("eventType", if (paymentResult.success) "PAYMENT_SUCCESS" else "PAYMENT_FAILED")
                        .property("orderId", orderId)
                }
                
                logger.info("支付处理完成: $orderId, 结果: ${paymentResult.success}")
            } catch (e: Exception) {
                logger.error("处理支付失败: $orderId", e)
                throw e
            }
        }
    }
}

配置最佳实践

生产环境配置

yaml
spring:
  pulsar:
    client:
      service-url: pulsar://pulsar-cluster:6650
      connection-timeout: 10s
      operation-timeout: 30s
      io-threads: 8
      listener-threads: 8
    
    producer:
      send-timeout: 30s
      batch-enabled: true
      batching-max-messages: 1000
      batching-max-publish-delay: 10ms
      cache:
        expire-after-access: 30m
        maximum-size: 1000
    
    consumer:
      subscription-type: shared
      ack-timeout: 30s
      negative-ack-redelivery-delay: 1m
      retry-enable: true
      dead-letter-policy:
        max-redeliver-count: 3
        dead-letter-topic: order-events-dlq
    
    transaction:
      enabled: true
      timeout: 60s

监控和可观测性

kotlin
@Configuration
class PulsarObservabilityConfig {
    
    @Bean
    fun pulsarProducerInterceptor(): ProducerInterceptor {
        return object : ProducerInterceptor {
            override fun eligible(message: Message<*>): Boolean = true
            
            override fun beforeSend(producer: Producer<*>, message: Message<*>): Message<*> {
                // 添加追踪信息
                return message
            }
            
            override fun onSendAcknowledgement(
                producer: Producer<*>,
                message: Message<*>,
                msgId: MessageId,
                exception: Throwable?
            ) {
                if (exception != null) {
                    // 记录发送失败指标
                    Metrics.counter("pulsar.producer.send.failed").increment()
                } else {
                    // 记录发送成功指标
                    Metrics.counter("pulsar.producer.send.success").increment()
                }
            }
        }
    }
}

常见问题和解决方案

1. 消息重复处理

TIP

实现幂等性处理,使用消息ID或业务唯一标识来避免重复处理。

kotlin
@Component
class IdempotentOrderProcessor(
    private val processedMessageRepository: ProcessedMessageRepository
) {
    
    @PulsarListener(topics = ["order-events"])
    fun handleOrderEvent(
        orderJson: String,
        message: Message<String>
    ) {
        val messageId = message.messageId.toString()
        
        // 检查消息是否已处理
        if (processedMessageRepository.existsByMessageId(messageId)) {
            logger.info("消息已处理,跳过: $messageId") 
            return
        }
        
        try {
            // 处理业务逻辑
            processOrder(orderJson)
            
            // 记录已处理的消息
            processedMessageRepository.save(ProcessedMessage(messageId, Instant.now()))
        } catch (e: Exception) {
            logger.error("处理订单事件失败", e)
            throw e
        }
    }
}

2. 死信队列处理

kotlin
@PulsarListener(
    topics = ["order-events-dlq"],
    subscriptionName = "dlq-processor"
)
fun handleDeadLetterMessage(
    @Payload failedMessage: String,
    message: Message<String>
) {
    logger.error("处理死信消息: ${message.messageId}")
    
    // 记录失败原因
    val failureRecord = MessageFailureRecord(
        messageId = message.messageId.toString(),
        topic = message.topicName,
        payload = failedMessage,
        failureTime = Instant.now(),
        properties = message.properties
    )
    
    failureRepository.save(failureRecord)
    
    // 可选:发送告警通知
    alertService.sendAlert("死信消息需要人工处理", failureRecord)
}

总结

Apache Pulsar 为 Spring Boot 应用提供了强大的消息传递能力:

简单易用:Spring Boot 自动配置大大简化了集成复杂度
功能丰富:支持传统和响应式编程模型
高可靠性:提供事务支持和死信队列机制
高性能:支持批处理和异步处理
可扩展:丰富的自定义配置选项

通过合理使用 Pulsar,我们可以构建出高性能、高可用的分布式消息系统,有效解决服务间通信和数据一致性问题。

IMPORTANT

在生产环境中,务必关注消息的幂等性处理、错误重试机制和监控告警,确保系统的稳定性和可靠性。