RabbitMQ开发建议
主要包含以下内容
- 配置RabbitMQ连接
- 消息队列的创建
- 发送消息
- 发送消息使用建议
- 订阅消息
- 处理消息
- 幂等操作
- 一条消息多次使用
- 订阅方性能优化
- 指标监控
强烈建议
- 尽量使用 direct 交换机,即:exchange 参数保持为 null
- 队列名称,bindingKey,routingKey 三者保持相同,建议使用类型全名
- 应用程序初始化时创建必需的队列,不管是发送消息还是订阅消息
- 创建队列时,new RabbitClient(...) 不要 指定第2个参数!
- RabbitMQ的连接配置名称放在一个 static readonly string 变量中,不要在代码中复制粘贴,也不要再做成参数!
- 订阅者的数量不能硬编码,请使用 LocalSettings.GetUInt("MsgDataType_RabbitSubscriber_Count", 10)
配置RabbitMQ连接
RabbitMQ在使用时,会有一些与连接相关的参数,例如,服务地址,端口,用户名,密码之类的参数。
为了安全,请将这些信息保存到配置服务中。
这个时候,我们需要给配置项取一个连接名称。
建议:“连接名称” 要能体现出属于哪个应用。
例如: "Nebula_Metis_Rabbit"
这样做有3个好处:
- 可以防止名称重复
- 便于识别连接的用途
- 便于以后实现队列迁移
消息队列的创建
基本概念介绍:
- 在消息订阅时,会要求指定要订阅哪个队列,因此队列必须要提前创建。
- 在消息发送时,消息发送到交换机就完事了,在RabbitMQ内部会有一个路由机制转发到队列,所以严格意义来说,发送方是不需要关心队列是否存在的。
建议:不管是发送方还是订阅方,在程序初始化时,都应该提前将队列创建好。
队列的创建方法:
public static class RabbitInit
{
public static readonly string QueueName = "mq111111111";
public static readonly string SettingName = "Nebula_Metis_Rabbit";
/// <summary>
/// 创建【强类型】的消息队列
/// </summary>
private static void CreateQueue1()
{
using( RabbitClient client = new RabbitClient(SettingName) ) {
// 创建队列,队列名称是 Product类型的全名
client.CreateQueueBind(typeof(Product));
}
}
/// <summary>
/// 创建【自由名称】的消息队列
/// </summary>
private static void CreateQueue2()
{
using( RabbitClient client = new RabbitClient(SettingName) ) {
// 创建队列,队列名称由参数指定
client.CreateQueueBind(QueueName);
}
}
}
注意:
- CreateQueueBind方法的传入参数
发送消息
/// <summary>
/// 将数据发送到队列,【强类型】方式
/// </summary>
public void SendData1(Product product) // 方法 1
{
// 如果只是偶尔需要发送消息,可以这样调用
using( RabbitClient client = new RabbitClient(RabbitInit.SettingName) ) {
// 往 RabbitMQ 发送一条消息,队列名称就是参数的类型全名
client.SendMessage(product);
}
}
public void SendData2(Product product) // 方法 2
{
// 往 RabbitMQ 发送一条消息,队列名称就是参数的类型全名
// 它会在整个应用程序内部维护一个共享连接,在多次调用时性能会更好。
this.SendRabbitMessage(RabbitInit.SettingName, product);
}
public void SendData3(Product product) // 方法 3
{
// RabbitClient构造方法的第二个参数是一个 “共享连接” 的名称,
// 默认值是null,表示连接仅在 RabbitClient 内部使用,RabbitClient离开作用域后将关闭连接。
// 如果指定了连接名称,那么连接将会保持,在下次构造RabbitClient时将会重用已存在的连接。
// 共享连接的名称建议使用 nameof(类型名称) ,表示连接仅供某个类型共用。
using( RabbitClient client = new RabbitClient(RabbitInit.SettingName, nameof(RabbitProducer)) ) {
client.SendMessage(product);
}
}
/// <summary>
/// 将数据发送到队列,【自由名称】方式
/// </summary>
public void SendData4(Product product)
{
using( RabbitClient client = new RabbitClient(RabbitInit.SettingName, nameof(RabbitProducer)) ) {
// 往 RabbitMQ 发送一条消息,队列名称由第3个参数指定
client.SendMessage(product, null, RabbitInit.QueueName);
}
}
以上4种方式都可行,根据注释来选择。
它们都只提供了二个必需参数:
- SettingName: 表示 RabbitMQ 在配置服务中的连接配置项
- product: 消息对象,发送时将根据变量的类型名称,找到对应的队列名称
发送消息使用建议
Nebula提供4种消息发送方式:
- 方法1:整体速度最差,主要慢在与RabbitMQ的连接上,用于偶尔发送一条消息的场景,每次调用结束后会关闭连接。
- 方法2:其实是“方法3”的包装版本,将“连接名称”固定了而已。
- 方法3:允许指定共享连接名称,如果程序有多个地方需要消息时,可以通过指定连接名称来减少访问冲突。
- 方法4:指定队列名称(其实是路由名称),用于“自由队列”
使用建议:
- 在程序初始化时,如果需要创建队列,使用“方法1” (用完就关闭的连接)
- 如果消息的发送量非常少,也推荐使用“方法1”
- 在Controller/Action中 推荐使用 “方法2”
- 其它地方可以使用“方法3”或者“方法4”,但是连接名不要乱取,每个名字意味着会产生一个长连接(永不关闭)
订阅消息
可参考以下代码:
/// <summary>
/// 订阅消息,【强类型】方式
/// </summary>
public static void Start1()
{
// 创建队列订阅者,并开启监听
RabbitSubscriber.Start<Product, PrintMessageHandler>(new RabbitSubscriberArgs {
SettingName = RabbitInit.SettingName,
QueueName = null, // ### 注意这个参数:队列名称是 Product类型的全名
SubscriberCount = LocalSettings.GetUInt("Product_RabbitSubscriber_Count", 1)
});
}
/// <summary>
/// 订阅消息,【自由名称】方式
/// </summary>
public static void Start2()
{
// 创建队列订阅者,并开启监听
RabbitSubscriber.StartAsync<Product, PrintMessageHandler2>(new RabbitSubscriberArgs {
SettingName = RabbitInit.SettingName,
QueueName = "mq111111111", // ### 注意这个参数
SubscriberCount = LocalSettings.GetUInt("Product_RabbitSubscriber_Count", 1)
});
}
注意:订阅者的数量一定可配置!
说明:
- 每个订阅者不仅包含一个Rabbit连接,还包含了一个 “消息处理管道"
- 在 消息处理管道 中,还包含了与消息类型匹配的 MessageHandler
- 消息处理管道 内部有重试和异常处理机制,MessageHandler 可以不用关心
处理消息
请在项目中创建一个Messages目录,然后参考下图创建 MessageHandler
消息最终是由 MessageHandler来处理的。
幂等操作
幂等操作是什么,请自行搜索,这里不解释。
幂等操作常规实现方法(方法1)
- 为消息指定一个GUID成员,例如: public Guid MessageGuid {get; set;}
- 对应的数据库表中,为 MessageGuid 创建【唯一索引】
- 在消息处理时
- 打开数据库事务
- 调用 entity.Insert2() 扩展方法将消息入库,判断返回值:是否为重复插入,
- 如果数据行已存在,可以调用 EndProcess() 来结束当前管道过程。
- 否则按以下步骤执行
- 处理消息(执行具体的业务过程)
- 提交数据库事务
方法2
如果:
- 业务过程比较简单,例如:只有一个操作
- 或者业务操作过程不支持回退(取消)
那么可以不使用上面的方法, 而是直接在处理消息!
因为消息在处理结束后,ClownFish 会给RabbitMQ发送一个ACK信号,此时消息将会删除,也不会被重复处理。
消息的幂等操作由于涉及到消息队列这个外部服务,因此需要分布式事务来保证一致性,但是,
分布式事务本质上都不是完美的,上面二种做法都存在极小概率的缺陷,所有不必纠结!
一条消息多次使用
场景举例:
客户端上传了一条日志,在服务端需要做三个处理
- 持久化到MySQL
- 同时写一份到Elasticsearch
- 实时数据分析
实现方法:
- 创建三个队列,设置相同的 Routing key ,消息将同时发送到三个队列,然后分开处理。
订阅方性能优化
对于消息的订阅方而言,提升消息的吞吐量有2种方法:
- 增加订阅者数量
- 优化MessageHandler的代码性能
订阅者数量不可能无限增加,所以优化代码性能是很有必要的,超出性能阀值会记录到性能日志中。
订阅者 数量如何确定?
- 订阅者的数量不要太大,每个订阅者对应着4个后台线程(线程池), 所以每增加1个订阅者,也会增加4个线程,太多无用的线程对性能提升没有任何帮助,反而浪费CPU时间,
- 判断队列是否有压力的最简单办法就是打开RabbitMQ的管理界面,切换到 Queues 页, 查看队列的 Message/Total 列,如果一段时间内(1分钟),这个数量没有上升趋势, 就表示消息订阅者的数量是足够的。
- 订阅者数量较小也不行,会导致消息的大量堆积,也会让MQ越来越慢。
所以,订阅者的数量以 “消息不堆积” 为原则。
这也是为什么订阅者的数量要做成配置参数的原因。
“消息不堆积” 举例说明
这张图片很明显 incoming 远大于deliver,因此 Total 越来越大,也就是消息堆积越来越多。
在这张图片中,incoming和deliver基本上相当,结果就是 Total 持续保持在较低水平,表示消息没有堆积。
指标监控
使用消息队列的应用程序,至少应该实现对队列的监控统计,主要有3个指标:
- 消息积压数量
- 当天消息处理总量
- 消息吞吐量
例如:
实现过程可参考: 业务指标监控开发