RabbitMQ开发建议

主要包含以下内容

  1. 配置RabbitMQ连接
  2. 消息队列的创建
  3. 发送消息
  4. 发送消息使用建议
  5. 订阅消息
  6. 处理消息
  7. 幂等操作
  8. 一条消息多次使用
  9. 订阅方性能优化
  10. 指标监控



强烈建议

  • 尽量使用 direct 交换机,即:exchange 参数保持为 null
  • 队列名称,bindingKey,routingKey 三者保持相同,建议使用类型全名
  • 应用程序初始化时创建必需的队列,不管是发送消息还是订阅消息
  • 创建队列时,new RabbitClient(...)     不要     指定第2个参数!
  • RabbitMQ的连接配置名称放在一个 static readonly string 变量中,不要在代码中复制粘贴,也不要再做成参数!
  • 订阅者的数量不能硬编码,请使用 LocalSettings.GetUInt("MsgDataType_RabbitSubscriber_Count", 10)



配置RabbitMQ连接

RabbitMQ在使用时,会有一些与连接相关的参数,例如,服务地址,端口,用户名,密码之类的参数。
为了安全,请将这些信息保存到配置服务中。
这个时候,我们需要给配置项取一个连接名称。

建议:“连接名称” 要能体现出属于哪个应用。

例如: "Nebula_Metis_Rabbit"

这样做有3个好处:

  • 可以防止名称重复
  • 便于识别连接的用途
  • 便于以后实现队列迁移



消息队列的创建

基本概念介绍:

  • 在消息订阅时,会要求指定要订阅哪个队列,因此队列必须要提前创建。
  • 在消息发送时,消息发送到交换机就完事了,在RabbitMQ内部会有一个路由机制转发到队列,所以严格意义来说,发送方是不需要关心队列是否存在的。

建议:不管是发送方还是订阅方,在程序初始化时,都应该提前将队列创建好。

队列的创建方法:

public static class RabbitInit
{
    public static readonly string QueueName = "mq111111111";

    public static readonly string SettingName = "Nebula_Metis_Rabbit";


    /// <summary>
    /// 创建【强类型】的消息队列
    /// </summary>
    private static void CreateQueue1()
    {
        using( RabbitClient client = new RabbitClient(SettingName) ) {

            // 创建队列,队列名称是 Product类型的全名
            client.CreateQueueBind(typeof(Product));
        }
    }

    /// <summary>
    /// 创建【自由名称】的消息队列
    /// </summary>
    private static void CreateQueue2()
    {
        using( RabbitClient client = new RabbitClient(SettingName) ) {

            // 创建队列,队列名称由参数指定
            client.CreateQueueBind(QueueName);
        }
    }
}

注意:

  • CreateQueueBind方法的传入参数



发送消息

/// <summary>
/// 将数据发送到队列,【强类型】方式
/// </summary>
public void SendData1(Product product)  // 方法 1
{
    // 如果只是偶尔需要发送消息,可以这样调用

    using( RabbitClient client = new RabbitClient(RabbitInit.SettingName) ) {

        // 往 RabbitMQ 发送一条消息,队列名称就是参数的类型全名
        client.SendMessage(product);
    }
}

public void SendData2(Product product)  // 方法 2
{
    // 往 RabbitMQ 发送一条消息,队列名称就是参数的类型全名
    // 它会在整个应用程序内部维护一个共享连接,在多次调用时性能会更好。

    this.SendRabbitMessage(RabbitInit.SettingName, product);
}

public void SendData3(Product product)  // 方法 3
{
    // RabbitClient构造方法的第二个参数是一个 “共享连接” 的名称,
    // 默认值是null,表示连接仅在 RabbitClient 内部使用,RabbitClient离开作用域后将关闭连接。

    // 如果指定了连接名称,那么连接将会保持,在下次构造RabbitClient时将会重用已存在的连接。
    // 共享连接的名称建议使用 nameof(类型名称) ,表示连接仅供某个类型共用。

    using( RabbitClient client = new RabbitClient(RabbitInit.SettingName, nameof(RabbitProducer)) ) {

        client.SendMessage(product);
    }
}

/// <summary>
/// 将数据发送到队列,【自由名称】方式
/// </summary>
public void SendData4(Product product)
{
    using( RabbitClient client = new RabbitClient(RabbitInit.SettingName, nameof(RabbitProducer)) ) {

        // 往 RabbitMQ 发送一条消息,队列名称由第3个参数指定
        client.SendMessage(product, null, RabbitInit.QueueName);
    }
}

以上4种方式都可行,根据注释来选择。

它们都只提供了二个必需参数:

  • SettingName: 表示 RabbitMQ 在配置服务中的连接配置项
  • product: 消息对象,发送时将根据变量的类型名称,找到对应的队列名称



发送消息使用建议

Nebula提供4种消息发送方式:

  • 方法1:整体速度最差,主要慢在与RabbitMQ的连接上,用于偶尔发送一条消息的场景,每次调用结束后会关闭连接。
  • 方法2:其实是“方法3”的包装版本,将“连接名称”固定了而已。
  • 方法3:允许指定共享连接名称,如果程序有多个地方需要消息时,可以通过指定连接名称来减少访问冲突。
  • 方法4:指定队列名称(其实是路由名称),用于“自由队列”

使用建议:

  • 在程序初始化时,如果需要创建队列,使用“方法1” (用完就关闭的连接)
  • 如果消息的发送量非常少,也推荐使用“方法1”
  • 在Controller/Action中 推荐使用 “方法2”
  • 其它地方可以使用“方法3”或者“方法4”,但是连接名不要乱取,每个名字意味着会产生一个长连接(永不关闭)



订阅消息

可参考以下代码:

/// <summary>
/// 订阅消息,【强类型】方式
/// </summary>
public static void Start1()
{
    // 创建队列订阅者,并开启监听
    RabbitSubscriber.Start<Product, PrintMessageHandler>(new RabbitSubscriberArgs {
        SettingName = RabbitInit.SettingName,
        QueueName = null,  // ### 注意这个参数:队列名称是 Product类型的全名
        SubscriberCount = LocalSettings.GetUInt("Product_RabbitSubscriber_Count", 1)
    });
}

/// <summary>
/// 订阅消息,【自由名称】方式
/// </summary>
public static void Start2()
{
    // 创建队列订阅者,并开启监听
    RabbitSubscriber.StartAsync<Product, PrintMessageHandler2>(new RabbitSubscriberArgs {
        SettingName = RabbitInit.SettingName,
        QueueName = "mq111111111",  //  ### 注意这个参数
        SubscriberCount = LocalSettings.GetUInt("Product_RabbitSubscriber_Count", 1)
    });
}

注意:订阅者的数量一定可配置!

说明:

  • 每个订阅者不仅包含一个Rabbit连接,还包含了一个 “消息处理管道"
  • 在 消息处理管道 中,还包含了与消息类型匹配的 MessageHandler
  • 消息处理管道 内部有重试和异常处理机制,MessageHandler 可以不用关心



处理消息

请在项目中创建一个Messages目录,然后参考下图创建 MessageHandler

xx


xx

消息最终是由 MessageHandler来处理的。



幂等操作

幂等操作是什么,请自行搜索,这里不解释。

幂等操作常规实现方法(方法1)

  • 为消息指定一个GUID成员,例如: public Guid MessageGuid {get; set;}
  • 对应的数据库表中,为 MessageGuid 创建【唯一索引】
  • 在消息处理时
    • 打开数据库事务
    • 调用 entity.Insert2() 扩展方法将消息入库,判断返回值:是否为重复插入,
      • 如果数据行已存在,可以调用 EndProcess() 来结束当前管道过程。
      • 否则按以下步骤执行
        • 处理消息(执行具体的业务过程)
        • 提交数据库事务

方法2
如果:

  • 业务过程比较简单,例如:只有一个操作
  • 或者业务操作过程不支持回退(取消)

那么可以不使用上面的方法, 而是直接在处理消息!
因为消息在处理结束后,ClownFish 会给RabbitMQ发送一个ACK信号,此时消息将会删除,也不会被重复处理。


消息的幂等操作由于涉及到消息队列这个外部服务,因此需要分布式事务来保证一致性,但是,
分布式事务本质上都不是完美的,上面二种做法都存在极小概率的缺陷,所有不必纠结!



一条消息多次使用

场景举例:
客户端上传了一条日志,在服务端需要做三个处理

  1. 持久化到MySQL
  2. 同时写一份到Elasticsearch
  3. 实时数据分析

实现方法:

  • 创建三个队列,设置相同的 Routing key ,消息将同时发送到三个队列,然后分开处理。



订阅方性能优化

对于消息的订阅方而言,提升消息的吞吐量有2种方法:

  • 增加订阅者数量
  • 优化MessageHandler的代码性能

订阅者数量不可能无限增加,所以优化代码性能是很有必要的,超出性能阀值会记录到性能日志中。


订阅者 数量如何确定?

  1. 订阅者的数量不要太大,每个订阅者对应着4个后台线程(线程池), 所以每增加1个订阅者,也会增加4个线程,太多无用的线程对性能提升没有任何帮助,反而浪费CPU时间,
  2. 判断队列是否有压力的最简单办法就是打开RabbitMQ的管理界面,切换到 Queues 页, 查看队列的 Message/Total 列,如果一段时间内(1分钟),这个数量没有上升趋势, 就表示消息订阅者的数量是足够的。
  3. 订阅者数量较小也不行,会导致消息的大量堆积,也会让MQ越来越慢。

所以,订阅者的数量以 “消息不堆积” 为原则。
这也是为什么订阅者的数量要做成配置参数的原因。



“消息不堆积” 举例说明
xx
这张图片很明显 incoming 远大于deliver,因此 Total 越来越大,也就是消息堆积越来越多。

xx

在这张图片中,incoming和deliver基本上相当,结果就是 Total 持续保持在较低水平,表示消息没有堆积。



指标监控

使用消息队列的应用程序,至少应该实现对队列的监控统计,主要有3个指标:

  1. 消息积压数量
  2. 当天消息处理总量
  3. 消息吞吐量

例如:
xx

实现过程可参考: 业务指标监控开发