Skip to content

RabbitMQ

RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP(Advanced Message Queuing Protocol)协议实现,以其灵活的路由、可靠性和易用性而广受欢迎。

核心概念

AMQP模型

  • Producer - 消息生产者,发送消息到Exchange
  • Exchange - 消息交换机,负责消息路由
  • Queue - 消息队列,存储待消费的消息
  • Consumer - 消息消费者,从Queue接收消息
  • Binding - 绑定关系,连接Exchange和Queue
  • Routing Key - 路由键,Exchange路由消息的依据

Exchange类型

Direct Exchange

直连交换机 - 精确匹配路由键
Producer -> Exchange(direct) -> Queue
           routing.key="order.create"

Topic Exchange

主题交换机 - 模式匹配路由键
Producer -> Exchange(topic) -> Queue1 (*.order.*)
                            -> Queue2 (user.#)

Fanout Exchange

扇形交换机 - 广播所有消息
Producer -> Exchange(fanout) -> Queue1
                             -> Queue2  
                             -> Queue3

Headers Exchange

头部交换机 - 基于消息头部属性路由
Producer -> Exchange(headers) -> Queue
           headers: {type="order", priority="high"}

基础使用

Java客户端示例

生产者

点击查看完整代码实现
java
import com.rabbitmq.client.*;

public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
             
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            
            // 发送消息
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, 
                MessageProperties.PERSISTENT_TEXT_PLAIN, 
                message.getBytes("UTF-8"));
                
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费者

点击查看完整代码实现
java
import com.rabbitmq.client.*;

public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            
            // 手动确认消息
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        
        // 设置手动确认模式
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
    }
}

消息可靠性

生产者确认

点击查看完整代码实现
java
// 开启发布确认模式
channel.confirmSelect();

// 同步确认
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.waitForConfirmsOrDie(5000);

// 异步确认
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) {
        System.out.println("Message confirmed: " + deliveryTag);
    }
    
    @Override
    public void handleNack(long deliveryTag, boolean multiple) {
        System.out.println("Message rejected: " + deliveryTag);
    }
});

消息持久化

java
// 队列持久化
channel.queueDeclare("durable_queue", true, false, false, null);

// 消息持久化
channel.basicPublish("", "durable_queue", 
    MessageProperties.PERSISTENT_TEXT_PLAIN, 
    message.getBytes());

消费者确认

java
// 手动确认模式
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});

// 在消息处理成功后确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

// 拒绝消息并重新入队
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);

// 拒绝消息不重新入队
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);

集群部署

集群配置

bash
# 节点1配置
export RABBITMQ_NODE_NAME=rabbit@node1
export RABBITMQ_NODE_PORT=5672
export RABBITMQ_NODENAME=rabbit@node1

# 启动节点1
rabbitmq-server -detached

# 节点2加入集群
export RABBITMQ_NODE_NAME=rabbit@node2
rabbitmq-server -detached
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

高可用队列

java
// 声明镜像队列
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-ha-policy", "all");  // 所有节点镜像
// args.put("x-ha-policy", "exactly");
// args.put("x-ha-policy-params", 2);  // 指定镜像节点数

channel.queueDeclare("ha_queue", true, false, false, args);

负载均衡

点击查看完整代码实现
bash
# HAProxy配置示例
global
    daemon

defaults
    mode tcp
    timeout connect 5000ms
    timeout client 50000ms
    timeout server 50000ms

listen rabbitmq_cluster
    bind 0.0.0.0:5672
    mode tcp
    balance roundrobin
    server rabbit1 192.168.1.101:5672 check
    server rabbit2 192.168.1.102:5672 check
    server rabbit3 192.168.1.103:5672 check

高级功能

死信队列

java
// 声明主队列,配置死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000);  // 消息TTL
args.put("x-dead-letter-exchange", "dlx");
args.put("x-dead-letter-routing-key", "dead");

channel.queueDeclare("main_queue", true, false, false, args);

// 声明死信队列
channel.exchangeDeclare("dlx", "direct");
channel.queueDeclare("dead_letter_queue", true, false, false, null);
channel.queueBind("dead_letter_queue", "dlx", "dead");

延迟队列

点击查看完整代码实现
java
// 使用TTL + 死信实现延迟队列
Map<String, Object> delayArgs = new HashMap<>();
delayArgs.put("x-message-ttl", 30000);  // 30秒后过期
delayArgs.put("x-dead-letter-exchange", "process_exchange");
delayArgs.put("x-dead-letter-routing-key", "process");

channel.queueDeclare("delay_queue", true, false, false, delayArgs);

// 或使用RabbitMQ延迟插件
channel.exchangeDeclare("delay_exchange", "x-delayed-message", 
    true, false, Collections.singletonMap("x-delayed-type", "direct"));

// 发送延迟消息
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .headers(Collections.singletonMap("x-delay", 30000))
    .build();
channel.basicPublish("delay_exchange", "routing_key", properties, message.getBytes());

消息优先级

java
// 声明优先级队列
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10);
channel.queueDeclare("priority_queue", true, false, false, args);

// 发送优先级消息
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .priority(5)
    .build();
channel.basicPublish("", "priority_queue", properties, message.getBytes());

性能优化

连接池配置

点击查看完整代码实现
java
// Spring Boot配置
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    connection-timeout: 15000
    publisher-confirms: true
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 5
        max-concurrency: 10
        prefetch: 10

批量操作

java
// 批量发送消息
List<String> messages = Arrays.asList("msg1", "msg2", "msg3");
for (String message : messages) {
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
channel.waitForConfirmsOrDie();

// 批量消费消息  
channel.basicQos(10);  // 预取10条消息

内存和磁盘优化

bash
# rabbitmq.conf配置
vm_memory_high_watermark.relative = 0.6
disk_free_limit.relative = 2.0
cluster_partition_handling = autoheal

监控管理

Management插件

bash
# 启用管理插件
rabbitmq-plugins enable rabbitmq_management

# 访问管理界面
http://localhost:15672
用户名: guest
密码: guest

监控指标

bash
# 查看队列状态
rabbitmqctl list_queues name messages consumers

# 查看交换机状态  
rabbitmqctl list_exchanges

# 查看绑定关系
rabbitmqctl list_bindings

# 查看连接状态
rabbitmqctl list_connections

Prometheus监控

yaml
# prometheus配置
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'rabbitmq'
    static_configs:
      - targets: ['localhost:15692']

应用模式

工作队列模式

java
// 多个消费者处理任务队列
// 轮询分发,负载均衡
channel.basicQos(1);  // 每次只取一条消息
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});

发布/订阅模式

java
// 使用fanout交换机广播消息
channel.exchangeDeclare("logs", "fanout");

// 生产者发布消息
channel.basicPublish("logs", "", null, message.getBytes());

// 消费者订阅消息
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});

RPC模式

点击查看完整代码实现
java
// RPC服务端
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String response = processRequest(new String(delivery.getBody(), "UTF-8"));
    
    // 发送响应
    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
        .correlationId(delivery.getProperties().getCorrelationId())
        .build();
        
    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, 
        response.getBytes("UTF-8"));
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};

channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, consumerTag -> {});

最佳实践

消息设计

  • 消息体保持小巧,大文件用引用方式
  • 使用合适的消息格式(JSON、Avro等)
  • 设计幂等的消费逻辑
  • 合理设置消息过期时间

队列管理

  • 合理命名队列和交换机
  • 设置适当的队列长度限制
  • 配置死信队列处理异常消息
  • 定期清理无用的队列

性能调优

  • 合理设置预取数量(prefetch)
  • 使用持久连接和通道复用
  • 批量发送和消费消息
  • 监控系统资源使用情况

可靠性保证

  • 开启生产者确认机制
  • 使用手动确认消费模式
  • 配置消息和队列持久化
  • 部署高可用集群

RabbitMQ凭借其完善的AMQP协议支持、灵活的路由机制和丰富的企业级功能,成为构建可靠消息系统的优秀选择,特别适合需要复杂路由逻辑和高可靠性保证的应用场景。

正在精进