RabbitMQ基础概念

本文将介绍 RabbitMQ 相关的开发基础,主要包含以下内容

  1. RabbitMQ基础介绍
  2. RabbitMQ工作模型
  3. 交换机使用建议
  4. 消息持久化
  5. 死信队列
  6. Virtual Hosts
  7. 连接的生命周期



RabbitMQ基础介绍

一个完整的RabbitMQ消息流转过程如下图:

xx

其中包含以下对象:

  • Producers消息生产者:负责产生消息,发送到RabbitMQ
  • Exchanges交换机 :接收消息,并投递到与之绑定的队列
  • Queues消息队列:负责存储消息
  • Consumers消息消费者:负责处理消息

这里需要注意

  • 消息是发送到交换机,而不是队列
  • 交换机与队列是独立工作的,它们之间存在一种映射关系(路由规则)
  • 一条消息经过交换机,可以复制成多份,最终发送到不同的队列
  • 一个队列可供多个订阅者订阅(并行消费消息)

Exchange
生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。
xx

Routing key
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。 在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。 RabbitMQ为routing key设定的长度限制为255 bytes。

Binding
RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
xx

Binding key
在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。 在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。

Exchange Types
RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述),下面分别进行介绍。

fanout
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
xx

direct
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
xx

topic
前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定: routing key为一个句点号“. ”分隔的字符串 (我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” binding key与routing key一样也是句点号“. ”分隔的字符串 binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
xx

headers
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。 在绑定Queue与Exchange时指定一组键值对,当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对,如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。



RabbitMQ工作模型

xx

  • Broker: 接受客户端连接,实现AMQP实体服务。
  • Vhost: 虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
  • Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue: 消息的载体,每个消息都会被投到一个或多个队列。
  • Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
  • Routing Key: 路由关键字,exchange根据这个关键字进行消息投递。
  • Producer: 消息生产者,就是投递消息的程序.
  • Consumer: 消息消费者,就是接受消息的程序.
  • Connection: 客户端与某个broker的网络连接。
  • Channel: 消息通道,在客户端的每个连接里,可建立多个channel.


交换机使用建议

  • 对于大多数情况下,使用RabbitMQ自带的 amq.direct 就足够了,不需要自行创建交换机
  • direct将也支持将一份消息发送到多个队列(复制),因此fanout并不是必要的
  • 为了简化开发及排查过程,建议用消息的类型名称做为 bindingkey



消息持久化

队列和交换机有一个创建时候指定的标志durable, durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列中的消息会在重启后恢复

消息持久化包括3部分

  1. exchange持久化,在声明时指定durable => true hannel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的

  2. queue持久化,在声明时指定durable => true channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的

  3. 消息持久化,在投递时指定delivery_mode => 2 channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的,如果exchange和queue两者之间有一个持久化,一个非持久化,则不允许建立绑定.

注意:一旦创建了队列和交换机,就不能修改其标志了, 例如,创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。



死信队列

死信队列介绍

  • 死信队列:DLX,dead-letter-exchange
  • 利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX

消息变成死信有以下几种情况

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

死信处理过程

  • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
  • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
  • 可以监听这个队列中的消息做相应的处理。

参考链接:https://www.jianshu.com/p/986ee5eb78bc



Virtual Hosts

Introduction
RabbitMQ is multi-tenant system: connections, exchanges, queues, bindings, user permissions, policies and some other things belong to virtual hosts, logical groups of entities. If you are familiar with virtual hosts in Apache or server blocks in Nginx, the idea is similar. There is, however, one important difference: virtual hosts in Apache are defined in the configuration file; that's not the case with RabbitMQ: virtual hosts are created and deleted using rabbitmqctl or HTTP API instead.

Logical and Physical Separation
Virtual hosts provide logical grouping and separation of resources. Separation of physical resources is not a goal of virtual hosts and should be considered an implementation detail.

For example, resource permissions in RabbitMQ are scoped per virtual host. A user doesn't have global permissions, only permissions in one or more virtual hosts. User tags can be considered global permissions but they are an exception to the rule.

Therefore when talking about user permissions it is very important to clarify what virtual host(s) they apply to.


小结:

  • 如果应用程序使用的队列数量比较多,可以考虑采用这种方式来隔离,方便管理。
  • 也可以用于隔离多套测试环境。


连接的生命周期

Connections are meant to be long-lived. The underlying protocol is designed and optimized for long running connections. That means that opening a new connection per operation, e.g. a message published, is unnecessary and strongly discouraged as it will introduce a lot of network roundtrips and overhead.

Channels are also meant to be long-lived but since many recoverable protocol errors will result in channel closure, channel lifespan could be shorter than that of its connection. Closing and opening new channels per operation is usually unnecessary but can be appropriate. When in doubt, consider reusing channels first.

Channel-level exceptions such as attempts to consume from a queue that does not exist will result in channel closure. A closed channel can no longer be used and will not receive any more events from the server (such as message deliveries). Channel-level exceptions will be logged by RabbitMQ and will initiate a shutdown sequence for the channel (see below).


连接应该是长久的。底层协议是为长时间运行的连接而设计和优化的。这意味着每次操作打开一个新连接, 例如发布一条消息,是不必要的,并且强烈反对这样做,因为这会引入大量的网络往返和开销。

通道也意味着寿命很长,但是由于许多可恢复的协议错误将导致通道关闭,通道的寿命可能比连接的寿命短。 每次操作关闭和打开新的通道通常是不必要的,但可以是适当的。当有疑问时,首先考虑重用通道。

通道级异常,比如试图从不存在的队列中消费,将导致通道关闭。关闭的通道将不再被使用, 并且将不再从服务器接收任何事件(如消息传递)。通道级异常会被RabbitMQ记录下来, 并且会启动通道的关闭序列(见下文)。


在Nebula中

  • 订阅者使用的长连接,且支持中断后重新连接
  • 生产者也支持长连接,用名称来区分维护,
  • new RabbitClient时,如果不指定连接名(第2个参数),连接将在RabbitClient结束时释放。



高可用

RabbitMQ的高可用涉及以下方面:

  • 安装部署
    • 集群复制:https://www.rabbitmq.com/clustering.html
  • 参数配置
    • 镜像设置:https://www.rabbitmq.com/ha.html
    • 队列策略:https://www.rabbitmq.com/parameters.html#policies
    • 持久化: 交换机,队列,消息
  • 客户端
    • 连接自动恢复:https://www.rabbitmq.com/dotnet-api-guide.html#recovery
    • 消息确认:Message acknowledgment

强烈建议 创建一个通用的队列策略,例如:

xx


显示效果:

xx