Skip to content

Apache RocketMQ

Apache RocketMQ是阿里巴巴开源的分布式消息中间件,专为高并发、低延迟和高可靠性场景设计,广泛应用于电商、金融等对消息可靠性要求极高的业务场景。

核心架构

架构组件

  • Name Server - 轻量级注册中心,管理Broker路由信息
  • Broker - 消息存储和转发服务器,负责消息的持久化
  • Producer - 消息生产者,支持同步、异步和单向发送
  • Consumer - 消息消费者,支持推拉两种消费模式

存储模型

Broker存储结构:
├── CommitLog - 统一消息存储文件
├── ConsumeQueue - 消息消费队列索引
├── IndexFile - 消息索引文件
└── Config - 配置文件

核心特性

高可用设计

  • 主从复制 - Master-Slave架构保证数据安全
  • 多Master模式 - 避免单点故障
  • 故障自动切换 - 支持自动主从切换
  • 数据零丢失 - 同步双写确保消息不丢失

消息模型

  • 普通消息 - 标准的发布订阅模型
  • 顺序消息 - 保证消息严格顺序消费
  • 延时消息 - 支持定时和延时消息投递
  • 事务消息 - 分布式事务消息支持

消息类型

普通消息

java
// 同步发送
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

Message msg = new Message(
    "TopicTest",        // topic
    "TagA",             // tag
    "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)
);

SendResult sendResult = producer.send(msg);
System.out.println(sendResult);

顺序消息

java
// 顺序消息发送
Message msg = new Message("TopicTest", tags, "KEY" + i, body);
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Long id = (Long) arg;  // 根据订单ID选择队列
        long index = id % mqs.size();
        return mqs.get((int) index);
    }
}, orderId);

事务消息

java
// 事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
producer.setTransactionListener(new TransactionListenerImpl());
producer.start();

// 发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);

消费模式

Push消费模式

点击查看完整代码实现
java
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setSubscription("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        
        for (MessageExt msg : msgs) {
            System.out.println("Received: " + new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

consumer.start();

Pull消费模式

java
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("pull_consumer_group");
pullConsumer.setNamesrvAddr("127.0.0.1:9876");
pullConsumer.start();

// 手动拉取消息
MessageQueue mq = new MessageQueue("TopicTest", "broker-a", 0);
PullResult pullResult = pullConsumer.pullBlockIfNotFound(mq, null, 0, 32);

集群部署

Name Server集群

bash
# Name Server启动脚本
nohup sh mqnamesrv > /dev/null 2>&1 &

# 集群模式(多个Name Server)
# 192.168.1.100:9876
# 192.168.1.101:9876
# 192.168.1.102:9876

Broker配置

Master配置

点击查看完整代码实现
properties
# Master Broker配置
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876;192.168.1.102:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=48
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
maxMessageSize=65536
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH

Slave配置

properties
# Slave Broker配置
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876;192.168.1.102:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

消息存储

CommitLog设计

  • 统一存储 - 所有消息统一写入CommitLog
  • 顺序写入 - 磁盘顺序I/O提升性能
  • 零拷贝 - mmap内存映射技术
  • 刷盘策略 - 支持同步/异步刷盘

ConsumeQueue索引

ConsumeQueue文件格式:
├── Offset (8 bytes) - CommitLog中的偏移量
├── Size (4 bytes) - 消息大小
└── Tag HashCode (8 bytes) - 消息Tag哈希码

存储优化

  • 文件分片 - 按大小分割文件,提升查找效率
  • 索引机制 - 多级索引快速定位消息
  • 内存缓存 - 热点数据内存缓存
  • 数据压缩 - 可选的消息内容压缩

高级功能

消息过滤

java
// Tag过滤
consumer.setSubscription("TopicTest", "TagA || TagB");

// SQL92过滤
consumer.setSubscription("TopicTest", 
    MessageSelector.bySql("a BETWEEN 0 AND 3"));

消息轨迹

java
// 开启消息轨迹
DefaultMQProducer producer = new DefaultMQProducer("producer_group", true);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group", true);

批量消息

java
// 批量发送消息
List<Message> messages = new ArrayList<>();
messages.add(new Message("TopicTest", "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message("TopicTest", "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message("TopicTest", "TagA", "OrderID003", "Hello world 2".getBytes()));

SendResult sendResult = producer.send(messages);

监控管理

管控台

  • 集群管理 - Broker、Name Server状态监控
  • 主题管理 - Topic创建、删除和配置修改
  • 消费管理 - 消费进度、消费者状态查看
  • 消息查询 - 按Key、MessageId查询消息

关键指标

bash
# 生产TPS
sh mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g producer_group

# 消费TPS和延迟
sh mqadmin consumerProgress -n localhost:9876 -g consumer_group

# Broker运行状态
sh mqadmin brokerStatus -n localhost:9876 -b 192.168.1.100:10911

性能调优

JVM参数

bash
# Broker JVM配置
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m"
JAVA_OPT="${JAVA_OPT} -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30"
JAVA_OPT="${JAVA_OPT} -XX:SoftRefLRUPolicyMSPerMB=0"

系统参数优化

bash
# 内核参数优化
echo 'vm.extra_free_kbytes = 2000000' >> /etc/sysctl.conf
echo 'vm.min_free_kbytes = 1000000' >> /etc/sysctl.conf
echo 'vm.max_map_count = 655360' >> /etc/sysctl.conf

# 文件句柄限制
echo '* soft nofile 655350' >> /etc/security/limits.conf
echo '* hard nofile 655350' >> /etc/security/limits.conf

磁盘优化

  • 使用SSD固态硬盘
  • 配置RAID0提升I/O性能
  • 设置合适的刷盘策略
  • 定期清理过期文件

应用场景

电商系统

  • 订单系统 - 订单状态变更通知
  • 库存系统 - 库存扣减消息
  • 支付系统 - 支付结果异步通知
  • 物流系统 - 物流状态更新推送

金融系统

  • 交易系统 - 交易消息可靠传递
  • 风控系统 - 实时风险事件通知
  • 清算系统 - 批量清算作业调度
  • 监管报送 - 合规数据上报

日志处理

  • 业务日志 - 应用日志统一收集
  • 操作审计 - 用户操作记录
  • 监控告警 - 系统异常事件推送
  • 数据同步 - 数据库变更同步

最佳实践

主题设计

  • Topic命名规范化,便于管理
  • 合理设置队列数量,一般为消费者数量的2-3倍
  • 根据业务特性选择消息类型
  • 设置合适的消息保留时间

生产者优化

  • 启用失败重试机制
  • 合理设置发送超时时间
  • 批量发送提升吞吐量
  • 监控发送成功率和延迟

消费者优化

  • 选择合适的消费模式(Push/Pull)
  • 合理设置消费线程数
  • 实现幂等消费逻辑
  • 监控消费延迟和积压

运维管理

  • 建立完善的监控体系
  • 制定故障应急预案
  • 定期检查磁盘使用情况
  • 备份重要配置和数据

Apache RocketMQ以其高可靠性、低延迟和丰富的企业级特性,成为构建大规模分布式系统消息基础设施的理想选择,特别适合对消息可靠性和一致性要求极高的金融、电商等关键业务场景。

正在精进