Skip to content

Apache Pulsar

Apache Pulsar是Yahoo开源的云原生分布式消息流平台,采用存储和计算分离的架构设计,提供低延迟、高吞吐量、强一致性和地理复制等特性。

核心特性

存储计算分离

  • Apache BookKeeper - 分布式存储层,负责消息持久化
  • Pulsar Broker - 计算层,处理消息路由和协议转换
  • 弹性扩容 - 存储和计算可以独立扩展
  • 资源利用率 - 更好的资源利用和成本控制

统一消息模型

  • 队列模型 - 传统的点对点消息传递
  • 流模型 - 类似Kafka的发布订阅模式
  • Reader API - 支持从任意位置读取消息
  • 多种订阅模式 - Exclusive、Shared、Failover、Key_Shared

多租户支持

  • 租户隔离 - 完整的多租户资源隔离
  • 命名空间 - 租户下的逻辑分组
  • 权限控制 - 细粒度的访问控制
  • 配额管理 - 资源使用配额限制

核心架构

集群组件

点击查看完整代码实现
Pulsar集群架构:
├── Pulsar Broker
│   ├── HTTP服务(Admin API)
│   ├── 二进制协议服务
│   └── 消息路由和负载均衡
├── Apache BookKeeper
│   ├── Bookie节点
│   ├── Journal日志
│   └── Ledger存储
├── Apache ZooKeeper
│   ├── 元数据存储
│   ├── 配置管理
│   └── 协调服务
└── Pulsar Proxy(可选)
    ├── 协议代理
    ├── 多集群路由
    └── 负载均衡

存储模型

Topic结构:
persistent://tenant/namespace/topic
├── Ledger 1 (BookKeeper)
├── Ledger 2 (BookKeeper) 
├── Ledger 3 (BookKeeper)
└── ...

每个Ledger分布在多个Bookie节点上

基础使用

生产者示例

Java客户端

点击查看完整代码实现
java
import org.apache.pulsar.client.api.*;

public class PulsarProducer {
    public static void main(String[] args) throws Exception {
        // 创建客户端
        PulsarClient client = PulsarClient.builder()
            .serviceUrl("pulsar://localhost:6650")
            .build();
        
        // 创建生产者
        Producer<String> producer = client.newProducer(Schema.STRING)
            .topic("persistent://public/default/my-topic")
            .create();
        
        // 发送消息
        for (int i = 0; i < 10; i++) {
            MessageId messageId = producer.send("Hello Pulsar " + i);
            System.out.println("Published message: " + messageId);
        }
        
        // 异步发送
        producer.sendAsync("Async message")
            .thenAccept(messageId -> {
                System.out.println("Message sent: " + messageId);
            })
            .exceptionally(ex -> {
                System.err.println("Failed to send message: " + ex);
                return null;
            });
        
        producer.close();
        client.close();
    }
}

批量发送配置

java
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .batchingMaxMessages(100)
    .batchingMaxBytes(1024 * 1024)
    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
    .create();

消费者示例

Exclusive订阅模式

点击查看完整代码实现
java
// 独占订阅 - 只有一个消费者可以订阅
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Exclusive)
    .subscribe();

while (true) {
    Message<String> message = consumer.receive();
    try {
        System.out.println("Received: " + message.getValue());
        consumer.acknowledge(message);
    } catch (Exception e) {
        consumer.negativeAcknowledge(message);
    }
}

Shared订阅模式

java
// 共享订阅 - 多个消费者共享消息
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("shared-subscription")
    .subscriptionType(SubscriptionType.Shared)
    .subscribe();

Key_Shared订阅模式

java
// 按Key共享 - 相同Key的消息发送到同一消费者
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("key-shared-subscription")
    .subscriptionType(SubscriptionType.Key_Shared)
    .subscribe();

Reader API

java
// Reader可以从任意位置读取消息
Reader<String> reader = client.newReader(Schema.STRING)
    .topic("my-topic")
    .startMessageId(MessageId.earliest)
    .create();

while (reader.hasNext()) {
    Message<String> message = reader.readNext();
    System.out.println("Read: " + message.getValue());
}

Schema管理

内置Schema

java
// 基础类型Schema
Producer<String> stringProducer = client.newProducer(Schema.STRING)
    .topic("string-topic").create();

Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
    .topic("bytes-topic").create();

Producer<Integer> intProducer = client.newProducer(Schema.INT32)
    .topic("int-topic").create();

自定义Schema

点击查看完整代码实现
java
// Avro Schema
import org.apache.pulsar.client.api.schema.GenericRecord;

// 用户定义的POJO
public class User {
    public String name;
    public int age;
    
    // 构造函数、getter、setter...
}

// 使用JSON Schema
Producer<User> producer = client.newProducer(Schema.JSON(User.class))
    .topic("user-topic")
    .create();

Consumer<User> consumer = client.newConsumer(Schema.JSON(User.class))
    .topic("user-topic")
    .subscriptionName("user-sub")
    .subscribe();

Schema演进

java
// Schema版本管理
SchemaInfo schemaInfo = SchemaInfo.builder()
    .name("user-schema")
    .type(SchemaType.AVRO)
    .schema(avroSchemaDefinition)
    .properties(Collections.singletonMap("version", "v1"))
    .build();

Producer<GenericRecord> producer = client.newProducer(Schema.generic(schemaInfo))
    .topic("user-topic")
    .create();

集群部署

ZooKeeper集群

bash
# zookeeper.conf
tickTime=2000
dataDir=/data/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
server.1=zk1:2888:3888
server.2=zk2:2888:3888  
server.3=zk3:2888:3888

BookKeeper集群

点击查看完整代码实现
conf
# bookkeeper.conf
bookiePort=3181
journalDirectory=/data/bookkeeper/journal
ledgerDirectories=/data/bookkeeper/ledgers
zkServers=zk1:2181,zk2:2181,zk3:2181

# 存储配置
diskCheckIntervalMs=10000
diskUsageThreshold=0.95
diskUsageWarnThreshold=0.90

# 复制配置
ensembleSize=3
writeQuorumSize=2
ackQuorumSize=2

Pulsar Broker配置

conf
# broker.conf
brokerServicePort=6650
webServicePort=8080
zookeeperServers=zk1:2181,zk2:2181,zk3:2181

# 集群配置
clusterName=pulsar-cluster
brokerDeduplicationEnabled=true
defaultRetentionTimeInMinutes=10080
defaultRetentionSizeInMB=1024

# 负载均衡
loadBalancerEnabled=true
loadBalancerAutoBundleSplitEnabled=true

高级功能

地理复制

bash
# 创建跨地域集群
pulsar-admin clusters create --url http://broker1:8080 --broker-url pulsar://broker1:6650 cluster-1
pulsar-admin clusters create --url http://broker2:8080 --broker-url pulsar://broker2:6650 cluster-2

# 配置租户和命名空间复制
pulsar-admin tenants create my-tenant --admin-roles admin --allowed-clusters cluster-1,cluster-2
pulsar-admin namespaces create my-tenant/my-namespace
pulsar-admin namespaces set-clusters my-tenant/my-namespace --clusters cluster-1,cluster-2

延时消息

java
// 发送延时消息
producer.newMessage()
    .value("Delayed message")
    .deliverAfter(30, TimeUnit.SECONDS)
    .send();

// 发送定时消息
producer.newMessage()
    .value("Scheduled message")
    .deliverAt(System.currentTimeMillis() + 60000)
    .send();

事务支持

点击查看完整代码实现
java
// 开启事务
Transaction transaction = client.newTransaction()
    .withTransactionTimeout(30, TimeUnit.SECONDS)
    .build().get();

try {
    // 在事务中发送消息
    producer.newMessage(transaction)
        .value("Transactional message 1")
        .send();
    
    producer.newMessage(transaction)
        .value("Transactional message 2") 
        .send();
    
    // 提交事务
    transaction.commit().get();
} catch (Exception e) {
    // 回滚事务
    transaction.abort();
}

消息去重

java
// 启用生产者去重
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .producerName("unique-producer")
    .enableDeduplication(true)
    .create();

// 发送带序列号的消息
producer.newMessage()
    .sequenceId(12345L)
    .value("Deduplicated message")
    .send();

监控管理

Pulsar Manager

bash
# Docker部署Pulsar Manager
docker run -it -d \
  -p 9527:9527 -p 7750:7750 \
  -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
  apachepulsar/pulsar-manager:latest

监控指标

java
// 生产者指标
ProducerStats stats = producer.getStats();
System.out.println("发送消息数: " + stats.getNumMsgsSent());
System.out.println("发送字节数: " + stats.getNumBytesSent());
System.out.println("发送失败数: " + stats.getNumSendFailed());

// 消费者指标
ConsumerStats consumerStats = consumer.getStats();
System.out.println("接收消息数: " + consumerStats.getNumMsgsReceived());
System.out.println("接收字节数: " + consumerStats.getNumBytesReceived());

管理API

bash
# Topic管理
pulsar-admin topics create persistent://public/default/my-topic
pulsar-admin topics list public/default
pulsar-admin topics stats persistent://public/default/my-topic

# 订阅管理
pulsar-admin topics subscriptions persistent://public/default/my-topic
pulsar-admin topics skip-messages -n 100 persistent://public/default/my-topic -s my-subscription

性能优化

生产者优化

java
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .enableBatching(true)
    .batchingMaxMessages(1000)
    .batchingMaxBytes(1024 * 1024)
    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
    .compressionType(CompressionType.ZSTD)
    .create();

消费者优化

java
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .receiverQueueSize(1000)
    .maxTotalReceiverQueueSizeAcrossPartitions(50000)
    .subscribe();

集群优化

  • 合理配置Ensemble Size和Quorum Size
  • 使用SSD提升BookKeeper性能
  • 配置适当的缓存大小
  • 监控网络延迟和磁盘I/O

应用场景

实时数据处理

  • 日志收集和分析
  • 实时指标监控
  • 事件驱动架构
  • 流式计算

微服务通信

  • 服务间异步通信
  • 事件溯源
  • CQRS架构
  • 分布式事务

多租户SaaS

  • 租户数据隔离
  • 资源配额管理
  • 多地域部署
  • 企业级安全

IoT和边缘计算

  • 设备数据收集
  • 边缘消息处理
  • 地理分布式架构
  • 大规模设备连接

最佳实践

Topic设计

  • 合理规划Topic命名规范
  • 根据业务需求选择分区数
  • 考虑消息顺序性要求
  • 设置适当的消息保留策略

性能调优

  • 启用消息批处理和压缩
  • 合理配置接收队列大小
  • 监控消息积压情况
  • 优化序列化方式

可靠性保证

  • 配置适当的复制因子
  • 实现消费者故障重试
  • 使用事务保证一致性
  • 建立监控告警机制

Apache Pulsar凭借其创新的存储计算分离架构、强大的多租户支持和云原生特性,为现代分布式应用提供了强大而灵活的消息流平台解决方案。

正在精进