Appearance
Spring Boot Apache Pulsar 支持详解 🚀
概述
Apache Pulsar 是一个分布式的发布-订阅消息系统,具有高性能、可扩展性和持久性的特点。Spring Boot 通过自动配置 Spring for Apache Pulsar 项目,为开发者提供了简单易用的 Pulsar 集成方案。
NOTE
Spring Boot 会在类路径中存在 org.springframework.pulsar:spring-pulsar
时自动配置传统(命令式)组件,存在 org.springframework.pulsar:spring-pulsar-reactive
时自动配置响应式组件。
为什么需要 Apache Pulsar? 🤔
在现代分布式系统中,服务间通信是一个核心挑战:
- 异步处理:避免同步调用造成的性能瓶颈
- 系统解耦:减少服务间的直接依赖
- 流量削峰:处理突发的高并发请求
- 数据一致性:确保分布式事务的可靠性
快速开始
依赖配置
kotlin
dependencies {
// 传统方式
implementation("org.springframework.boot:spring-boot-starter-pulsar")
// 响应式方式
implementation("org.springframework.boot:spring-boot-starter-pulsar-reactive")
}
xml
<dependencies>
<!-- 传统方式 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-pulsar</artifactId>
</dependency>
<!-- 响应式方式 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-pulsar-reactive</artifactId>
</dependency>
</dependencies>
连接到 Pulsar
基础连接配置
Spring Boot 会自动配置 PulsarClient
Bean,默认连接到本地 Pulsar 实例。
yaml
spring:
pulsar:
client:
service-url: pulsar://localhost:6650 # Pulsar 服务地址
properties
spring.pulsar.client.service-url=pulsar://localhost:6650
IMPORTANT
service-url 必须是有效的 Pulsar 协议 URL,格式为 pulsar://host:port
认证配置
对于需要认证的 Pulsar 集群,可以配置认证插件:
yaml
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: https://auth.server.cloud/
privateKey: file:///Users/some-key.json
audience: urn:sn:acme:dev:my-instance
yaml
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
param:
token: eyJhbGciOiJIUzI1NiJ9...
WARNING
认证参数名称必须与插件期望的完全匹配(通常是驼峰命名),Spring Boot 不会进行宽松绑定。
自定义客户端配置
kotlin
@Configuration
class PulsarConfig {
@Bean
fun pulsarClientCustomizer(): PulsarClientBuilderCustomizer {
return PulsarClientBuilderCustomizer { builder ->
builder
.connectionTimeout(10, TimeUnit.SECONDS)
.operationTimeout(15, TimeUnit.SECONDS)
.ioThreads(4)
}
}
}
发送消息
传统方式发送消息
kotlin
@Service
class OrderService(
private val pulsarTemplate: PulsarTemplate<String>
) {
fun processOrder(order: Order) {
try {
// 发送订单创建事件
pulsarTemplate.send("order-events", order.toJson())
// 发送带有自定义属性的消息
pulsarTemplate.send("order-events", order.toJson()) { messageBuilder ->
messageBuilder
.property("orderId", order.id.toString())
.property("customerId", order.customerId)
.eventTime(order.createdAt.toEpochMilli())
}
logger.info("订单事件已发送: ${order.id}")
} catch (e: PulsarClientException) {
logger.error("发送订单事件失败", e)
throw OrderProcessingException("消息发送失败", e)
}
}
}
响应式方式发送消息
kotlin
@Service
class ReactiveOrderService(
private val reactivePulsarTemplate: ReactivePulsarTemplate<String>
) {
fun processOrderAsync(order: Order): Mono<Void> {
return reactivePulsarTemplate
.send("order-events", order.toJson())
.doOnSuccess { messageId ->
logger.info("订单事件已发送,消息ID: $messageId")
}
.doOnError { error ->
logger.error("发送订单事件失败", error)
}
.then()
}
}
生产者配置
yaml
spring:
pulsar:
producer:
name: order-producer
send-timeout: 30s
batch-enabled: true
batching-max-messages: 100
cache:
expire-after-access: 10m
maximum-size: 100
接收消息
传统监听器
kotlin
@Component
class OrderEventListener {
@PulsarListener(topics = ["order-events"])
fun handleOrderEvent(
@Payload orderJson: String,
@Header("orderId") orderId: String?,
message: Message<String>
) {
try {
val order = Json.decodeFromString<Order>(orderJson)
logger.info("处理订单事件: ${order.id}")
// 业务逻辑处理
processOrderEvent(order)
} catch (e: Exception) {
logger.error("处理订单事件失败: $orderJson", e)
throw e // 重新抛出异常以触发重试机制
}
}
// 多主题监听
@PulsarListener(
topics = ["order-events", "payment-events"],
subscriptionName = "business-processor"
)
fun handleBusinessEvents(event: String) {
// 处理多种业务事件
}
}
响应式监听器
kotlin
@Component
class ReactiveOrderEventListener {
@ReactivePulsarListener(topics = ["order-events"])
fun handleOrderEventReactive(orderJson: String): Mono<Void> {
return Mono.fromCallable {
Json.decodeFromString<Order>(orderJson)
}
.flatMap { order ->
processOrderEventAsync(order)
}
.doOnSuccess {
logger.info("响应式处理订单事件完成")
}
.doOnError { error ->
logger.error("响应式处理订单事件失败", error)
}
.then()
}
private fun processOrderEventAsync(order: Order): Mono<Void> {
// 异步业务处理逻辑
return Mono.empty()
}
}
消费者配置
yaml
spring:
pulsar:
consumer:
subscription-name: order-service
subscription-type: shared
ack-timeout: 30s
retry-enable: true
listener:
schema-type: string
observation-enabled: true
消息读取器 (Reader)
Reader 提供了更灵活的消息消费方式,允许手动管理游标位置。
传统 Reader
kotlin
@Component
class OrderHistoryReader {
@PulsarReader(
topics = ["order-events"],
startMessageId = "earliest" // 从最早的消息开始读取
)
fun readOrderHistory(orderJson: String) {
val order = Json.decodeFromString<Order>(orderJson)
logger.info("读取历史订单: ${order.id}")
// 处理历史数据
processHistoricalOrder(order)
}
}
响应式 Reader
kotlin
@Service
class ReactiveOrderHistoryService(
private val readerFactory: ReactivePulsarReaderFactory<String>
) {
fun readOrdersFromTime(startTime: Instant): Flux<Order> {
val customizer = ReactiveMessageReaderBuilderCustomizer<String> { builder ->
builder
.topic("order-events")
.startAtSpec(StartAtSpec.ofInstant(startTime))
}
return readerFactory
.createReader(Schema.STRING, listOf(customizer))
.readMany()
.map { message ->
Json.decodeFromString<Order>(message.value)
}
.doOnNext { order ->
logger.info("读取到订单: ${order.id}")
}
}
}
事务支持
Pulsar 支持事务,确保消息的原子性操作。
启用事务
yaml
spring:
pulsar:
transaction:
enabled: true # 启用事务支持
事务使用示例
kotlin
@Service
@Transactional
class TransactionalOrderService(
private val pulsarTemplate: PulsarTemplate<String>,
private val orderRepository: OrderRepository
) {
@PulsarListener(
topics = ["payment-events"],
transactional = "required"
)
fun handlePaymentEvent(paymentJson: String) {
val payment = Json.decodeFromString<Payment>(paymentJson)
// 数据库操作
val order = orderRepository.findById(payment.orderId)
?: throw OrderNotFoundException("订单不存在: ${payment.orderId}")
order.status = OrderStatus.PAID
orderRepository.save(order)
// 发送后续事件(在同一事务中)
pulsarTemplate.send("order-status-events", order.toJson())
logger.info("订单支付处理完成: ${order.id}")
}
}
完整的业务场景示例
让我们通过一个电商订单处理的完整示例来展示 Pulsar 的实际应用:
完整的订单处理服务示例
kotlin
// 订单数据模型
@Serializable
data class Order(
val id: String,
val customerId: String,
val items: List<OrderItem>,
val totalAmount: BigDecimal,
var status: OrderStatus,
val createdAt: Instant
)
@Serializable
data class OrderItem(
val productId: String,
val quantity: Int,
val price: BigDecimal
)
enum class OrderStatus {
CREATED, PAID, SHIPPED, DELIVERED, CANCELLED
}
// 订单服务
@Service
class OrderService(
private val pulsarTemplate: PulsarTemplate<String>,
private val orderRepository: OrderRepository
) {
private val logger = LoggerFactory.getLogger(OrderService::class.java)
fun createOrder(orderRequest: CreateOrderRequest): Order {
// 创建订单
val order = Order(
id = UUID.randomUUID().toString(),
customerId = orderRequest.customerId,
items = orderRequest.items,
totalAmount = calculateTotal(orderRequest.items),
status = OrderStatus.CREATED,
createdAt = Instant.now()
)
// 保存到数据库
orderRepository.save(order)
// 发送订单创建事件
pulsarTemplate.send("order-events", order.toJson()) { builder ->
builder
.property("eventType", "ORDER_CREATED")
.property("orderId", order.id)
.property("customerId", order.customerId)
}
logger.info("订单创建成功: ${order.id}")
return order
}
}
// 库存服务监听器
@Component
class InventoryEventListener(
private val inventoryService: InventoryService,
private val pulsarTemplate: PulsarTemplate<String>
) {
private val logger = LoggerFactory.getLogger(InventoryEventListener::class.java)
@PulsarListener(
topics = ["order-events"],
subscriptionName = "inventory-service"
)
fun handleOrderEvent(
@Payload orderJson: String,
@Header("eventType") eventType: String?
) {
if (eventType == "ORDER_CREATED") {
val order = Json.decodeFromString<Order>(orderJson)
try {
// 检查库存
val inventoryResult = inventoryService.checkAndReserveInventory(order.items)
if (inventoryResult.success) {
// 库存充足,发送库存预留成功事件
pulsarTemplate.send("inventory-events", inventoryResult.toJson()) { builder ->
builder
.property("eventType", "INVENTORY_RESERVED")
.property("orderId", order.id)
}
logger.info("库存预留成功: ${order.id}")
} else {
// 库存不足,发送库存不足事件
pulsarTemplate.send("inventory-events", inventoryResult.toJson()) { builder ->
builder
.property("eventType", "INVENTORY_INSUFFICIENT")
.property("orderId", order.id)
}
logger.warn("库存不足: ${order.id}")
}
} catch (e: Exception) {
logger.error("处理订单库存检查失败: ${order.id}", e)
throw e
}
}
}
}
// 支付服务监听器
@Component
class PaymentEventListener(
private val paymentService: PaymentService,
private val pulsarTemplate: PulsarTemplate<String>
) {
@PulsarListener(
topics = ["inventory-events"],
subscriptionName = "payment-service"
)
fun handleInventoryEvent(
@Payload eventJson: String,
@Header("eventType") eventType: String?,
@Header("orderId") orderId: String?
) {
if (eventType == "INVENTORY_RESERVED" && orderId != null) {
try {
// 处理支付
val paymentResult = paymentService.processPayment(orderId)
// 发送支付结果事件
pulsarTemplate.send("payment-events", paymentResult.toJson()) { builder ->
builder
.property("eventType", if (paymentResult.success) "PAYMENT_SUCCESS" else "PAYMENT_FAILED")
.property("orderId", orderId)
}
logger.info("支付处理完成: $orderId, 结果: ${paymentResult.success}")
} catch (e: Exception) {
logger.error("处理支付失败: $orderId", e)
throw e
}
}
}
}
配置最佳实践
生产环境配置
yaml
spring:
pulsar:
client:
service-url: pulsar://pulsar-cluster:6650
connection-timeout: 10s
operation-timeout: 30s
io-threads: 8
listener-threads: 8
producer:
send-timeout: 30s
batch-enabled: true
batching-max-messages: 1000
batching-max-publish-delay: 10ms
cache:
expire-after-access: 30m
maximum-size: 1000
consumer:
subscription-type: shared
ack-timeout: 30s
negative-ack-redelivery-delay: 1m
retry-enable: true
dead-letter-policy:
max-redeliver-count: 3
dead-letter-topic: order-events-dlq
transaction:
enabled: true
timeout: 60s
监控和可观测性
kotlin
@Configuration
class PulsarObservabilityConfig {
@Bean
fun pulsarProducerInterceptor(): ProducerInterceptor {
return object : ProducerInterceptor {
override fun eligible(message: Message<*>): Boolean = true
override fun beforeSend(producer: Producer<*>, message: Message<*>): Message<*> {
// 添加追踪信息
return message
}
override fun onSendAcknowledgement(
producer: Producer<*>,
message: Message<*>,
msgId: MessageId,
exception: Throwable?
) {
if (exception != null) {
// 记录发送失败指标
Metrics.counter("pulsar.producer.send.failed").increment()
} else {
// 记录发送成功指标
Metrics.counter("pulsar.producer.send.success").increment()
}
}
}
}
}
常见问题和解决方案
1. 消息重复处理
TIP
实现幂等性处理,使用消息ID或业务唯一标识来避免重复处理。
kotlin
@Component
class IdempotentOrderProcessor(
private val processedMessageRepository: ProcessedMessageRepository
) {
@PulsarListener(topics = ["order-events"])
fun handleOrderEvent(
orderJson: String,
message: Message<String>
) {
val messageId = message.messageId.toString()
// 检查消息是否已处理
if (processedMessageRepository.existsByMessageId(messageId)) {
logger.info("消息已处理,跳过: $messageId")
return
}
try {
// 处理业务逻辑
processOrder(orderJson)
// 记录已处理的消息
processedMessageRepository.save(ProcessedMessage(messageId, Instant.now()))
} catch (e: Exception) {
logger.error("处理订单事件失败", e)
throw e
}
}
}
2. 死信队列处理
kotlin
@PulsarListener(
topics = ["order-events-dlq"],
subscriptionName = "dlq-processor"
)
fun handleDeadLetterMessage(
@Payload failedMessage: String,
message: Message<String>
) {
logger.error("处理死信消息: ${message.messageId}")
// 记录失败原因
val failureRecord = MessageFailureRecord(
messageId = message.messageId.toString(),
topic = message.topicName,
payload = failedMessage,
failureTime = Instant.now(),
properties = message.properties
)
failureRepository.save(failureRecord)
// 可选:发送告警通知
alertService.sendAlert("死信消息需要人工处理", failureRecord)
}
总结
Apache Pulsar 为 Spring Boot 应用提供了强大的消息传递能力:
✅ 简单易用:Spring Boot 自动配置大大简化了集成复杂度
✅ 功能丰富:支持传统和响应式编程模型
✅ 高可靠性:提供事务支持和死信队列机制
✅ 高性能:支持批处理和异步处理
✅ 可扩展:丰富的自定义配置选项
通过合理使用 Pulsar,我们可以构建出高性能、高可用的分布式消息系统,有效解决服务间通信和数据一致性问题。
IMPORTANT
在生产环境中,务必关注消息的幂等性处理、错误重试机制和监控告警,确保系统的稳定性和可靠性。