Apache Pulsar
Apache Pulsar是Yahoo开源的云原生分布式消息流平台,采用存储和计算分离的架构设计,提供低延迟、高吞吐量、强一致性和地理复制等特性。
核心特性
存储计算分离
- Apache BookKeeper - 分布式存储层,负责消息持久化
- Pulsar Broker - 计算层,处理消息路由和协议转换
- 弹性扩容 - 存储和计算可以独立扩展
- 资源利用率 - 更好的资源利用和成本控制
统一消息模型
- 队列模型 - 传统的点对点消息传递
- 流模型 - 类似Kafka的发布订阅模式
- Reader API - 支持从任意位置读取消息
- 多种订阅模式 - Exclusive、Shared、Failover、Key_Shared
多租户支持
- 租户隔离 - 完整的多租户资源隔离
- 命名空间 - 租户下的逻辑分组
- 权限控制 - 细粒度的访问控制
- 配额管理 - 资源使用配额限制
核心架构
集群组件
点击查看完整代码实现
Pulsar集群架构:
├── Pulsar Broker
│ ├── HTTP服务(Admin API)
│ ├── 二进制协议服务
│ └── 消息路由和负载均衡
├── Apache BookKeeper
│ ├── Bookie节点
│ ├── Journal日志
│ └── Ledger存储
├── Apache ZooKeeper
│ ├── 元数据存储
│ ├── 配置管理
│ └── 协调服务
└── Pulsar Proxy(可选)
├── 协议代理
├── 多集群路由
└── 负载均衡1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
存储模型
Topic结构:
persistent://tenant/namespace/topic
├── Ledger 1 (BookKeeper)
├── Ledger 2 (BookKeeper)
├── Ledger 3 (BookKeeper)
└── ...
每个Ledger分布在多个Bookie节点上1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
基础使用
生产者示例
Java客户端
点击查看完整代码实现
java
import org.apache.pulsar.client.api.*;
public class PulsarProducer {
public static void main(String[] args) throws Exception {
// 创建客户端
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// 创建生产者
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("persistent://public/default/my-topic")
.create();
// 发送消息
for (int i = 0; i < 10; i++) {
MessageId messageId = producer.send("Hello Pulsar " + i);
System.out.println("Published message: " + messageId);
}
// 异步发送
producer.sendAsync("Async message")
.thenAccept(messageId -> {
System.out.println("Message sent: " + messageId);
})
.exceptionally(ex -> {
System.err.println("Failed to send message: " + ex);
return null;
});
producer.close();
client.close();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
批量发送配置
java
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.batchingMaxMessages(100)
.batchingMaxBytes(1024 * 1024)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.create();1
2
3
4
5
6
2
3
4
5
6
消费者示例
Exclusive订阅模式
点击查看完整代码实现
java
// 独占订阅 - 只有一个消费者可以订阅
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
while (true) {
Message<String> message = consumer.receive();
try {
System.out.println("Received: " + message.getValue());
consumer.acknowledge(message);
} catch (Exception e) {
consumer.negativeAcknowledge(message);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Shared订阅模式
java
// 共享订阅 - 多个消费者共享消息
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("shared-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();1
2
3
4
5
6
2
3
4
5
6
Key_Shared订阅模式
java
// 按Key共享 - 相同Key的消息发送到同一消费者
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("key-shared-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();1
2
3
4
5
6
2
3
4
5
6
Reader API
java
// Reader可以从任意位置读取消息
Reader<String> reader = client.newReader(Schema.STRING)
.topic("my-topic")
.startMessageId(MessageId.earliest)
.create();
while (reader.hasNext()) {
Message<String> message = reader.readNext();
System.out.println("Read: " + message.getValue());
}1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
Schema管理
内置Schema
java
// 基础类型Schema
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("string-topic").create();
Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
.topic("bytes-topic").create();
Producer<Integer> intProducer = client.newProducer(Schema.INT32)
.topic("int-topic").create();1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
自定义Schema
点击查看完整代码实现
java
// Avro Schema
import org.apache.pulsar.client.api.schema.GenericRecord;
// 用户定义的POJO
public class User {
public String name;
public int age;
// 构造函数、getter、setter...
}
// 使用JSON Schema
Producer<User> producer = client.newProducer(Schema.JSON(User.class))
.topic("user-topic")
.create();
Consumer<User> consumer = client.newConsumer(Schema.JSON(User.class))
.topic("user-topic")
.subscriptionName("user-sub")
.subscribe();1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Schema演进
java
// Schema版本管理
SchemaInfo schemaInfo = SchemaInfo.builder()
.name("user-schema")
.type(SchemaType.AVRO)
.schema(avroSchemaDefinition)
.properties(Collections.singletonMap("version", "v1"))
.build();
Producer<GenericRecord> producer = client.newProducer(Schema.generic(schemaInfo))
.topic("user-topic")
.create();1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
集群部署
ZooKeeper集群
bash
# zookeeper.conf
tickTime=2000
dataDir=/data/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:38881
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
BookKeeper集群
点击查看完整代码实现
conf
# bookkeeper.conf
bookiePort=3181
journalDirectory=/data/bookkeeper/journal
ledgerDirectories=/data/bookkeeper/ledgers
zkServers=zk1:2181,zk2:2181,zk3:2181
# 存储配置
diskCheckIntervalMs=10000
diskUsageThreshold=0.95
diskUsageWarnThreshold=0.90
# 复制配置
ensembleSize=3
writeQuorumSize=2
ackQuorumSize=21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Pulsar Broker配置
conf
# broker.conf
brokerServicePort=6650
webServicePort=8080
zookeeperServers=zk1:2181,zk2:2181,zk3:2181
# 集群配置
clusterName=pulsar-cluster
brokerDeduplicationEnabled=true
defaultRetentionTimeInMinutes=10080
defaultRetentionSizeInMB=1024
# 负载均衡
loadBalancerEnabled=true
loadBalancerAutoBundleSplitEnabled=true1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
高级功能
地理复制
bash
# 创建跨地域集群
pulsar-admin clusters create --url http://broker1:8080 --broker-url pulsar://broker1:6650 cluster-1
pulsar-admin clusters create --url http://broker2:8080 --broker-url pulsar://broker2:6650 cluster-2
# 配置租户和命名空间复制
pulsar-admin tenants create my-tenant --admin-roles admin --allowed-clusters cluster-1,cluster-2
pulsar-admin namespaces create my-tenant/my-namespace
pulsar-admin namespaces set-clusters my-tenant/my-namespace --clusters cluster-1,cluster-21
2
3
4
5
6
7
8
2
3
4
5
6
7
8
延时消息
java
// 发送延时消息
producer.newMessage()
.value("Delayed message")
.deliverAfter(30, TimeUnit.SECONDS)
.send();
// 发送定时消息
producer.newMessage()
.value("Scheduled message")
.deliverAt(System.currentTimeMillis() + 60000)
.send();1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
事务支持
点击查看完整代码实现
java
// 开启事务
Transaction transaction = client.newTransaction()
.withTransactionTimeout(30, TimeUnit.SECONDS)
.build().get();
try {
// 在事务中发送消息
producer.newMessage(transaction)
.value("Transactional message 1")
.send();
producer.newMessage(transaction)
.value("Transactional message 2")
.send();
// 提交事务
transaction.commit().get();
} catch (Exception e) {
// 回滚事务
transaction.abort();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
消息去重
java
// 启用生产者去重
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.producerName("unique-producer")
.enableDeduplication(true)
.create();
// 发送带序列号的消息
producer.newMessage()
.sequenceId(12345L)
.value("Deduplicated message")
.send();1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
监控管理
Pulsar Manager
bash
# Docker部署Pulsar Manager
docker run -it -d \
-p 9527:9527 -p 7750:7750 \
-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
apachepulsar/pulsar-manager:latest1
2
3
4
5
2
3
4
5
监控指标
java
// 生产者指标
ProducerStats stats = producer.getStats();
System.out.println("发送消息数: " + stats.getNumMsgsSent());
System.out.println("发送字节数: " + stats.getNumBytesSent());
System.out.println("发送失败数: " + stats.getNumSendFailed());
// 消费者指标
ConsumerStats consumerStats = consumer.getStats();
System.out.println("接收消息数: " + consumerStats.getNumMsgsReceived());
System.out.println("接收字节数: " + consumerStats.getNumBytesReceived());1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
管理API
bash
# Topic管理
pulsar-admin topics create persistent://public/default/my-topic
pulsar-admin topics list public/default
pulsar-admin topics stats persistent://public/default/my-topic
# 订阅管理
pulsar-admin topics subscriptions persistent://public/default/my-topic
pulsar-admin topics skip-messages -n 100 persistent://public/default/my-topic -s my-subscription1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
性能优化
生产者优化
java
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.enableBatching(true)
.batchingMaxMessages(1000)
.batchingMaxBytes(1024 * 1024)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.compressionType(CompressionType.ZSTD)
.create();1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
消费者优化
java
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-subscription")
.receiverQueueSize(1000)
.maxTotalReceiverQueueSizeAcrossPartitions(50000)
.subscribe();1
2
3
4
5
6
2
3
4
5
6
集群优化
- 合理配置Ensemble Size和Quorum Size
- 使用SSD提升BookKeeper性能
- 配置适当的缓存大小
- 监控网络延迟和磁盘I/O
应用场景
实时数据处理
- 日志收集和分析
- 实时指标监控
- 事件驱动架构
- 流式计算
微服务通信
- 服务间异步通信
- 事件溯源
- CQRS架构
- 分布式事务
多租户SaaS
- 租户数据隔离
- 资源配额管理
- 多地域部署
- 企业级安全
IoT和边缘计算
- 设备数据收集
- 边缘消息处理
- 地理分布式架构
- 大规模设备连接
最佳实践
Topic设计
- 合理规划Topic命名规范
- 根据业务需求选择分区数
- 考虑消息顺序性要求
- 设置适当的消息保留策略
性能调优
- 启用消息批处理和压缩
- 合理配置接收队列大小
- 监控消息积压情况
- 优化序列化方式
可靠性保证
- 配置适当的复制因子
- 实现消费者故障重试
- 使用事务保证一致性
- 建立监控告警机制
Apache Pulsar凭借其创新的存储计算分离架构、强大的多租户支持和云原生特性,为现代分布式应用提供了强大而灵活的消息流平台解决方案。
