Apache RocketMQ
Apache RocketMQ是阿里巴巴开源的分布式消息中间件,专为高并发、低延迟和高可靠性场景设计,广泛应用于电商、金融等对消息可靠性要求极高的业务场景。
核心架构
架构组件
- Name Server - 轻量级注册中心,管理Broker路由信息
- Broker - 消息存储和转发服务器,负责消息的持久化
- Producer - 消息生产者,支持同步、异步和单向发送
- Consumer - 消息消费者,支持推拉两种消费模式
存储模型
Broker存储结构:
├── CommitLog - 统一消息存储文件
├── ConsumeQueue - 消息消费队列索引
├── IndexFile - 消息索引文件
└── Config - 配置文件1
2
3
4
5
2
3
4
5
核心特性
高可用设计
- 主从复制 - Master-Slave架构保证数据安全
- 多Master模式 - 避免单点故障
- 故障自动切换 - 支持自动主从切换
- 数据零丢失 - 同步双写确保消息不丢失
消息模型
- 普通消息 - 标准的发布订阅模型
- 顺序消息 - 保证消息严格顺序消费
- 延时消息 - 支持定时和延时消息投递
- 事务消息 - 分布式事务消息支持
消息类型
普通消息
java
// 同步发送
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message(
"TopicTest", // topic
"TagA", // tag
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
顺序消息
java
// 顺序消息发送
Message msg = new Message("TopicTest", tags, "KEY" + i, body);
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; // 根据订单ID选择队列
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderId);1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
事务消息
java
// 事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
producer.setTransactionListener(new TransactionListenerImpl());
producer.start();
// 发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);1
2
3
4
5
6
7
2
3
4
5
6
7
消费模式
Push消费模式
点击查看完整代码实现
java
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setSubscription("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("Received: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Pull消费模式
java
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("pull_consumer_group");
pullConsumer.setNamesrvAddr("127.0.0.1:9876");
pullConsumer.start();
// 手动拉取消息
MessageQueue mq = new MessageQueue("TopicTest", "broker-a", 0);
PullResult pullResult = pullConsumer.pullBlockIfNotFound(mq, null, 0, 32);1
2
3
4
5
6
7
2
3
4
5
6
7
集群部署
Name Server集群
bash
# Name Server启动脚本
nohup sh mqnamesrv > /dev/null 2>&1 &
# 集群模式(多个Name Server)
# 192.168.1.100:9876
# 192.168.1.101:9876
# 192.168.1.102:98761
2
3
4
5
6
7
2
3
4
5
6
7
Broker配置
Master配置
点击查看完整代码实现
properties
# Master Broker配置
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876;192.168.1.102:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=48
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
maxMessageSize=65536
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Slave配置
properties
# Slave Broker配置
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876;192.168.1.102:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
消息存储
CommitLog设计
- 统一存储 - 所有消息统一写入CommitLog
- 顺序写入 - 磁盘顺序I/O提升性能
- 零拷贝 - mmap内存映射技术
- 刷盘策略 - 支持同步/异步刷盘
ConsumeQueue索引
ConsumeQueue文件格式:
├── Offset (8 bytes) - CommitLog中的偏移量
├── Size (4 bytes) - 消息大小
└── Tag HashCode (8 bytes) - 消息Tag哈希码1
2
3
4
2
3
4
存储优化
- 文件分片 - 按大小分割文件,提升查找效率
- 索引机制 - 多级索引快速定位消息
- 内存缓存 - 热点数据内存缓存
- 数据压缩 - 可选的消息内容压缩
高级功能
消息过滤
java
// Tag过滤
consumer.setSubscription("TopicTest", "TagA || TagB");
// SQL92过滤
consumer.setSubscription("TopicTest",
MessageSelector.bySql("a BETWEEN 0 AND 3"));1
2
3
4
5
6
2
3
4
5
6
消息轨迹
java
// 开启消息轨迹
DefaultMQProducer producer = new DefaultMQProducer("producer_group", true);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group", true);1
2
3
2
3
批量消息
java
// 批量发送消息
List<Message> messages = new ArrayList<>();
messages.add(new Message("TopicTest", "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message("TopicTest", "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message("TopicTest", "TagA", "OrderID003", "Hello world 2".getBytes()));
SendResult sendResult = producer.send(messages);1
2
3
4
5
6
7
2
3
4
5
6
7
监控管理
管控台
- 集群管理 - Broker、Name Server状态监控
- 主题管理 - Topic创建、删除和配置修改
- 消费管理 - 消费进度、消费者状态查看
- 消息查询 - 按Key、MessageId查询消息
关键指标
bash
# 生产TPS
sh mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g producer_group
# 消费TPS和延迟
sh mqadmin consumerProgress -n localhost:9876 -g consumer_group
# Broker运行状态
sh mqadmin brokerStatus -n localhost:9876 -b 192.168.1.100:109111
2
3
4
5
6
7
8
2
3
4
5
6
7
8
性能调优
JVM参数
bash
# Broker JVM配置
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m"
JAVA_OPT="${JAVA_OPT} -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30"
JAVA_OPT="${JAVA_OPT} -XX:SoftRefLRUPolicyMSPerMB=0"1
2
3
4
5
2
3
4
5
系统参数优化
bash
# 内核参数优化
echo 'vm.extra_free_kbytes = 2000000' >> /etc/sysctl.conf
echo 'vm.min_free_kbytes = 1000000' >> /etc/sysctl.conf
echo 'vm.max_map_count = 655360' >> /etc/sysctl.conf
# 文件句柄限制
echo '* soft nofile 655350' >> /etc/security/limits.conf
echo '* hard nofile 655350' >> /etc/security/limits.conf1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
磁盘优化
- 使用SSD固态硬盘
- 配置RAID0提升I/O性能
- 设置合适的刷盘策略
- 定期清理过期文件
应用场景
电商系统
- 订单系统 - 订单状态变更通知
- 库存系统 - 库存扣减消息
- 支付系统 - 支付结果异步通知
- 物流系统 - 物流状态更新推送
金融系统
- 交易系统 - 交易消息可靠传递
- 风控系统 - 实时风险事件通知
- 清算系统 - 批量清算作业调度
- 监管报送 - 合规数据上报
日志处理
- 业务日志 - 应用日志统一收集
- 操作审计 - 用户操作记录
- 监控告警 - 系统异常事件推送
- 数据同步 - 数据库变更同步
最佳实践
主题设计
- Topic命名规范化,便于管理
- 合理设置队列数量,一般为消费者数量的2-3倍
- 根据业务特性选择消息类型
- 设置合适的消息保留时间
生产者优化
- 启用失败重试机制
- 合理设置发送超时时间
- 批量发送提升吞吐量
- 监控发送成功率和延迟
消费者优化
- 选择合适的消费模式(Push/Pull)
- 合理设置消费线程数
- 实现幂等消费逻辑
- 监控消费延迟和积压
运维管理
- 建立完善的监控体系
- 制定故障应急预案
- 定期检查磁盘使用情况
- 备份重要配置和数据
Apache RocketMQ以其高可靠性、低延迟和丰富的企业级特性,成为构建大规模分布式系统消息基础设施的理想选择,特别适合对消息可靠性和一致性要求极高的金融、电商等关键业务场景。
