RabbitMQ
RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP(Advanced Message Queuing Protocol)协议实现,以其灵活的路由、可靠性和易用性而广受欢迎。
核心概念
AMQP模型
- Producer - 消息生产者,发送消息到Exchange
- Exchange - 消息交换机,负责消息路由
- Queue - 消息队列,存储待消费的消息
- Consumer - 消息消费者,从Queue接收消息
- Binding - 绑定关系,连接Exchange和Queue
- Routing Key - 路由键,Exchange路由消息的依据
Exchange类型
Direct Exchange
直连交换机 - 精确匹配路由键
Producer -> Exchange(direct) -> Queue
routing.key="order.create"1
2
3
2
3
Topic Exchange
主题交换机 - 模式匹配路由键
Producer -> Exchange(topic) -> Queue1 (*.order.*)
-> Queue2 (user.#)1
2
3
2
3
Fanout Exchange
扇形交换机 - 广播所有消息
Producer -> Exchange(fanout) -> Queue1
-> Queue2
-> Queue31
2
3
4
2
3
4
Headers Exchange
头部交换机 - 基于消息头部属性路由
Producer -> Exchange(headers) -> Queue
headers: {type="order", priority="high"}1
2
3
2
3
基础使用
Java客户端示例
生产者
点击查看完整代码实现
java
import com.rabbitmq.client.*;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 发送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
消费者
点击查看完整代码实现
java
import com.rabbitmq.client.*;
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
// 手动确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 设置手动确认模式
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
消息可靠性
生产者确认
点击查看完整代码实现
java
// 开启发布确认模式
channel.confirmSelect();
// 同步确认
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.waitForConfirmsOrDie(5000);
// 异步确认
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
System.out.println("Message confirmed: " + deliveryTag);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
System.out.println("Message rejected: " + deliveryTag);
}
});1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
消息持久化
java
// 队列持久化
channel.queueDeclare("durable_queue", true, false, false, null);
// 消息持久化
channel.basicPublish("", "durable_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());1
2
3
4
5
6
7
2
3
4
5
6
7
消费者确认
java
// 手动确认模式
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
// 在消息处理成功后确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// 拒绝消息并重新入队
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
// 拒绝消息不重新入队
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
集群部署
集群配置
bash
# 节点1配置
export RABBITMQ_NODE_NAME=rabbit@node1
export RABBITMQ_NODE_PORT=5672
export RABBITMQ_NODENAME=rabbit@node1
# 启动节点1
rabbitmq-server -detached
# 节点2加入集群
export RABBITMQ_NODE_NAME=rabbit@node2
rabbitmq-server -detached
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
高可用队列
java
// 声明镜像队列
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-ha-policy", "all"); // 所有节点镜像
// args.put("x-ha-policy", "exactly");
// args.put("x-ha-policy-params", 2); // 指定镜像节点数
channel.queueDeclare("ha_queue", true, false, false, args);1
2
3
4
5
6
7
2
3
4
5
6
7
负载均衡
点击查看完整代码实现
bash
# HAProxy配置示例
global
daemon
defaults
mode tcp
timeout connect 5000ms
timeout client 50000ms
timeout server 50000ms
listen rabbitmq_cluster
bind 0.0.0.0:5672
mode tcp
balance roundrobin
server rabbit1 192.168.1.101:5672 check
server rabbit2 192.168.1.102:5672 check
server rabbit3 192.168.1.103:5672 check1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
高级功能
死信队列
java
// 声明主队列,配置死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 消息TTL
args.put("x-dead-letter-exchange", "dlx");
args.put("x-dead-letter-routing-key", "dead");
channel.queueDeclare("main_queue", true, false, false, args);
// 声明死信队列
channel.exchangeDeclare("dlx", "direct");
channel.queueDeclare("dead_letter_queue", true, false, false, null);
channel.queueBind("dead_letter_queue", "dlx", "dead");1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
延迟队列
点击查看完整代码实现
java
// 使用TTL + 死信实现延迟队列
Map<String, Object> delayArgs = new HashMap<>();
delayArgs.put("x-message-ttl", 30000); // 30秒后过期
delayArgs.put("x-dead-letter-exchange", "process_exchange");
delayArgs.put("x-dead-letter-routing-key", "process");
channel.queueDeclare("delay_queue", true, false, false, delayArgs);
// 或使用RabbitMQ延迟插件
channel.exchangeDeclare("delay_exchange", "x-delayed-message",
true, false, Collections.singletonMap("x-delayed-type", "direct"));
// 发送延迟消息
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.headers(Collections.singletonMap("x-delay", 30000))
.build();
channel.basicPublish("delay_exchange", "routing_key", properties, message.getBytes());1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
消息优先级
java
// 声明优先级队列
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10);
channel.queueDeclare("priority_queue", true, false, false, args);
// 发送优先级消息
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.priority(5)
.build();
channel.basicPublish("", "priority_queue", properties, message.getBytes());1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
性能优化
连接池配置
点击查看完整代码实现
java
// Spring Boot配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
connection-timeout: 15000
publisher-confirms: true
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
concurrency: 5
max-concurrency: 10
prefetch: 101
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
批量操作
java
// 批量发送消息
List<String> messages = Arrays.asList("msg1", "msg2", "msg3");
for (String message : messages) {
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
channel.waitForConfirmsOrDie();
// 批量消费消息
channel.basicQos(10); // 预取10条消息1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
内存和磁盘优化
bash
# rabbitmq.conf配置
vm_memory_high_watermark.relative = 0.6
disk_free_limit.relative = 2.0
cluster_partition_handling = autoheal1
2
3
4
2
3
4
监控管理
Management插件
bash
# 启用管理插件
rabbitmq-plugins enable rabbitmq_management
# 访问管理界面
http://localhost:15672
用户名: guest
密码: guest1
2
3
4
5
6
7
2
3
4
5
6
7
监控指标
bash
# 查看队列状态
rabbitmqctl list_queues name messages consumers
# 查看交换机状态
rabbitmqctl list_exchanges
# 查看绑定关系
rabbitmqctl list_bindings
# 查看连接状态
rabbitmqctl list_connections1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
Prometheus监控
yaml
# prometheus配置
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'rabbitmq'
static_configs:
- targets: ['localhost:15692']1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
应用模式
工作队列模式
java
// 多个消费者处理任务队列
// 轮询分发,负载均衡
channel.basicQos(1); // 每次只取一条消息
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});1
2
3
4
2
3
4
发布/订阅模式
java
// 使用fanout交换机广播消息
channel.exchangeDeclare("logs", "fanout");
// 生产者发布消息
channel.basicPublish("logs", "", null, message.getBytes());
// 消费者订阅消息
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
RPC模式
点击查看完整代码实现
java
// RPC服务端
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String response = processRequest(new String(delivery.getBody(), "UTF-8"));
// 发送响应
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps,
response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, consumerTag -> {});1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
最佳实践
消息设计
- 消息体保持小巧,大文件用引用方式
- 使用合适的消息格式(JSON、Avro等)
- 设计幂等的消费逻辑
- 合理设置消息过期时间
队列管理
- 合理命名队列和交换机
- 设置适当的队列长度限制
- 配置死信队列处理异常消息
- 定期清理无用的队列
性能调优
- 合理设置预取数量(prefetch)
- 使用持久连接和通道复用
- 批量发送和消费消息
- 监控系统资源使用情况
可靠性保证
- 开启生产者确认机制
- 使用手动确认消费模式
- 配置消息和队列持久化
- 部署高可用集群
RabbitMQ凭借其完善的AMQP协议支持、灵活的路由机制和丰富的企业级功能,成为构建可靠消息系统的优秀选择,特别适合需要复杂路由逻辑和高可靠性保证的应用场景。
