Skip to content

RabbitMQ 是什么?

本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。

实现了一个 Broker 构架,这意味着消息在发送给客户端时先在中心队列排队,对路由(Routing)、负载均衡(Load balance)或者数据持久化都有很好的支持。

三大组件

  • 交换器 (Exchange):生产者将消息发送到交换器,由交换器将消息路由到一个或者多个队列中。当路由不到时,或返回给生产者或直接丢弃。
  • 队列 (Queue):用来存储消息的数据结构,位于硬盘或内存中。
    • 多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理,而不是每个人消费,避免消息被重复消费。
    • 一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
    • RabbitMQ的消息只能存放在队列中。
  • 绑定 (Binding):一套规则,交换机通过将队列的 BindingKey(绑定键) 和 RoutingKey(路由键) 比较之后确定将消息投递给哪个队列。
    • 一个交换机可以绑定多个队列
    • 同一个队列可以与同一个交换机进行多个绑定
    • 不通队列可以用相同的 BindingKey 和同一个交换机进行绑定
    • 不一定生效(fanout模式会无视)

direct 类型交换器

  • Broker:可以看做 RabbitMQ 的服务节点。一般情况下一个 Broker 可以看做一个 RabbitMQ 服务器。
  • 信道(Channel):是生产者、消费者与 RabbitMQ 通信的渠道,信道是建立在 TCP 链接上的虚拟链接,且每条 TCP 链接上的信道数量没有限制。就是说 RabbitMQ 在一条 TCP 链接上建立成百上千个信道来达到多个线程处理,这个 TCP 被多个线程共享,每个信道在 RabbitMQ 都有唯一的 ID,保证了信道私有性,每个信道对应一个线程使用。

RabbitMQ 不支持队列层面的广播消费,如果有广播消费的需求,需要在其上进行二次开发,这样会很麻烦,不建议这样做。

RabbitMQ 核心概念?

RabbitMQ 的整体模型架构如下:

图1-RabbitMQ 的整体模型架构

消息由 2 部分组成:

  • 消息头(或者说是标签 Label):由一系列的可选属性组成包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等
    • 生产者把消息交由 RabbitMQ 后,RabbitMQ 会根据消息头把消息发送给感兴趣的 Consumer(消费者)。
  • 消息体(或者说payload)
    • 消费者只会使用消息体,会丢弃消息头。

交换器类型

有 4 种类型:

  • direct(默认)

    • 它会把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。
    • 常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。
  • fanout

    • 它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。
  • topic

    • 也是匹配 BindingKey 和 RoutingKey,不过不是完全匹配
    • topic 类型两个 key 为一个点号“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词)
    • BindingKey 中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
    • 更加灵活
  • headers

    • 根据消息内容的header属性而匹配,性能不好,基本不用

什么是死信队列?如何导致的?

死信队列(Dead-Letter-Exchange DLX):当消息在一个队列中变成死信 (dead message) 之后,它能被重新发送到死信队列。

导致的死信的几种原因

  • 消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false
  • 消息 TTL 过期。
  • 队列满了,无法再添加。

什么是延迟队列?RabbitMQ 怎么实现延迟队列?

延迟队列:消息存储特定的时间,被发送以后等待特定时间后,消费者才能拿到这个消息进行消费。

RabbitMQ 本身是没有延迟队列的,要实现延迟消息,一般有两种方式:

  1. 通过 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(Time To Live)。
  2. 在 RabbitMQ 3.5.7 及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时,插件依赖 Erlang/OPT 18.0 及以上。

什么是优先级队列?

RabbitMQ 自 V3.5.0 有优先级队列实现,优先级高的队列会先被消费。

可以通过x-max-priority参数来实现优先级队列。不过,当消费速度大于生产速度且 Broker 没有堆积的情况下,优先级显得没有意义。

RabbitMQ 有哪些工作模式?

  • 简单模式

    • 一个生产者发送消息到一个队列,一个消费者从这个队列中接收消息。这是最简单的工作模式,没有交换机(Exchange)的参与,消息直接发送到队列。
    • 没有交换机,适用于简单的消息传递场景,如聊天系统。
  • work 工作模式

    • 一个生产者发送消息到一个队列,多个消费者监听这个队列并争抢消息进行消费。
    • 消费者可以设置预取一定的消息
    • 对应于交换机的 direct 类型
  • pub/sub 发布订阅模式

    • 生产者将消息发送到交换机(Exchange),交换机将消息广播到所有绑定的队列中,每个队列中的消费者都可以接收到消息。这种模式允许一个消息被多个消费者消费。
    • 适用于需要广播消息的场景,如邮件群发。
    • 对应于交换机的 fanout 类型
  • Routing 路由模式

    • 生产者将消息发送到交换机,交换机根据路由键(Routing Key)将消息发送到对应的队列中。
    • 对应于交换机的 direct 类型
  • Topic 主题模式

    • 和路由模式类似,可以使用通配符
    • 对应于交换机的 topic 类型

如何保证消息的可靠性?

消息到 MQ 的过程中搞丢,MQ 自己搞丢,MQ 到消费过程中搞丢。

  • 生产者到 RabbitMQ:

    • 事务机制:借鉴数据库的事物,可以进行批量提交和回滚,性能低
    • Confirm 机制:RabbitMQ 通过发送一个确认(ack)或否定(nack)消息给生产者,,以告知生产者消息是否已经被成功接收并存储
    • 注意:事务机制和 Confirm 机制是互斥的,两者不能共存,会导致 RabbitMQ 报错。
  • RabbitMQ 自身:

    • 持久化:将消息存储在磁盘上,需要确保队列和交换机也被设置为持久化。
    • 设置工作模式为镜像模式
  • RabbitMQ 到消费者:

    • basicAck 机制:消费者接收到消息后,需要向RabbitMQ发送一个确认(ack)消息来告知RabbitMQ该消息已经被成功处理。只有当RabbitMQ收到消费者的确认消息后,它才会从队列中删除该消息,以确保消息只被处理一次。
    • 死信队列:失败或者超时的消息存放在死信队列,之后通过消息补偿机制(如重新发送、告警等)集中处理这些失败的消息

如何保证 RabbitMQ 消息的顺序性?

  • 拆分多个 queue(消息队列),每个 queue(消息队列) 一个 consumer(消费者),就是多一些 queue (消息队列)而已,确实是麻烦点;
  • 或者就一个 queue (消息队列)但是对应一个 consumer(消费者),然后这个 consumer(消费者)内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

如何保证 RabbitMQ 高可用的?

RabbitMQ 是基于主从(非分布式)做高可用性的

单机模式

Demo 级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式。

普通集群模式

  • 启动多个 RabbitMQ 实例。但是创建的 queue,只会放在一个 RabbitMQ 实例上(即所有数据放在一个实例上),但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。

  • 消费时,如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。

  • 这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

镜像集群模式

这种模式,才是所谓的 RabbitMQ 的高可用模式。

  • 每个实例拥有 queue 的全部数据
  • 每次写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。
  • 在创建 queue 的时候可以指定应用这个策略实现这种模式
  • 任何一个机器宕机了,其他机器有完整数据,不会导致数据的丢失。
  • 坏处在于:
    • 第一,性能开销太大,消息需要同步到所有机器上,导致网络带宽压力和消耗很重

RabbitMQ 一个 queue 的数据都是放在一个节点里的(不会有纵向分表的形式)。

Rabbit的模式是单体queue可以不同的

如何解决消息队列的延时以及过期失效问题?

  • 解决延时:设置过期时间TTL,超时会被RabbitMQ 给清理掉,或者使用延时队列
  • 解决过期失效:
    • 配置死信队列,超时消息丢入死信队列,后续可以采用消息重新写入
    • 扩展消费者实例

正在精进