使用RabbitMQ

本文将介绍RabbitMQ的使用技巧,主要包含以下内容

  1. 在配置服务中注册连接
  2. 创建队列
  3. 发送消息
  4. 订阅消息
  5. 消息的延迟(定时)处理
  6. 多队列&多进程-自动配对订阅



参考链接

官方API参考: https://www.rabbitmq.com/dotnet-api-guide.html

官方教程: https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html



在配置服务中注册连接

在Nebula中使用RabbitMQ必须先在配置服务中注册RabbitMQ连接,
具体做法中在settings表中增加一条记录(也可通过AdminUI界面来操作),例如:

xx

字段说明

  • name: 连接名称,在代码中使用
  • value: 连接参数,可用数据成员参考下面RabbitOption代码
  • restype=1: 用于开启Venus监控
/// <summary>
/// RabbitMQ的连接信息
/// </summary>
public sealed class RabbitOption
{
    /// <summary>
    /// VHost,默认值:"/"
    /// </summary>
    public string VHost { get; set; } = "/";
    
    /// <summary>
    /// RabbitMQ服务地址。【必填】
    /// </summary>
    public string Server { get; set; }
    
    /// <summary>
    /// 连接端口(大于 0 时有效)
    /// </summary>
    public int Port { get; set; }

    /// <summary>
    /// HTTP协议的连接端口(大于 0 时有效)
    /// </summary>
    public int HttpPort { get; set; }
    
    /// <summary>
    /// 登录名。【必填】
    /// </summary>
    public string Username { get; set; }
    
    /// <summary>
    /// 登录密码。
    /// </summary>
    public string Password { get; set; }
}

参数值配置示例

server=RabbitHost;username=fishli;password=aaaaaaaaaaaa;vhost=nbtest





创建队列

下面示例代码演示了2种创建队列的方法,
它们都使用了默认的 amq.direct 交换机。

public static class RabbitInit1
{
    public static readonly string SettingName = "Rabbit-DEMO";


    /// <summary>
    /// 创建【强类型】的消息队列
    /// </summary>
    private static void CreateQueue1()
    {
        using( RabbitClient client = new RabbitClient(SettingName) ) {

            // 队列名称,bindingKey 二者相同,都是 Product类型的全名
            client.CreateQueueBind(typeof(Product));
        }
    }
}
public static class RabbitInit2
{
    public static readonly string QueueName = "DEMO-QUEUE";

    public static readonly string SettingName = "Rabbit-DEMO";

    /// <summary>
    /// 创建【自由名称】的消息队列
    /// </summary>
    private static void CreateQueue2()
    {
        using( RabbitClient client = new RabbitClient(SettingName) ) {

            // 队列名称,bindingKey 二者相同,都是 "DEMO-QUEUE"
            client.CreateQueueBind(QueueName);
        }
    }
}



CreateQueueBind方法签名如下:

/// <summary>
/// 创建队列并绑定到交换机
/// </summary>
/// <param name="queue">队列名称</param>
/// <param name="exchange">交换机名称,默认值: "amq.direct"</param>
/// <param name="bindingKey">从交换机到队列的映射标记</param>
/// <param name="argument">调用QueueDeclare时传递的argument参数</param>
public virtual void CreateQueueBind(string queue, string exchange = null, string bindingKey = null, IDictionary<string, object> argument = null)

/// <summary>
/// 创建队列并绑定到交换机
/// </summary>
/// <param name="dataType">消息的数据类型,最终创建的队列名称就是消息数据类型的全名,bindingKey与队列同名</param>
/// <param name="exchange">交换机名称,默认值: "amq.direct"</param>
/// <param name="argument">调用QueueDeclare时传递的argument参数</param>
public void CreateQueueBind(Type dataType, string exchange = null, IDictionary<string, object> argument = null)





发送消息

可参考以下代码:

internal class RabbitProducer : BaseController
{
    /// <summary>
    /// 将数据发送到队列,【强类型】方式
    /// </summary>
    public void SendData1(Product product)  // 方法 1
    {
        // 如果只是     【偶尔】    需要发送消息,可以这样调用

        using( RabbitClient client = new RabbitClient(RabbitInit.SettingName) ) {

            // 往 RabbitMQ 发送一条消息,routingKey就是参数的类型全名
            client.SendMessage(product);
        }
    }

    public void SendData2(Product product)  // 方法 2
    {
        // 往 RabbitMQ 发送一条消息,routingKey就是参数的类型全名
        // 它会在整个应用程序内部维护一个共享连接,在多次调用时性能会更好。

        this.SendRabbitMessage(RabbitInit.SettingName, product);
    }

    public void SendData3(Product product)  // 方法 3
    {
        // RabbitClient构造方法的第二个参数是一个 “共享连接” 的名称,

        // 默认值是null,表示连接仅在 RabbitClient 内部使用,RabbitClient离开作用域后将关闭连接。

        // 如果指定了连接名称,那么连接将会保持,在下次构造RabbitClient时将会重用已存在的连接。
        // 共享连接的名称建议使用 nameof(类型名称) ,表示连接仅供某个类型共用。

        using( RabbitClient client = new RabbitClient(RabbitInit.SettingName, nameof(RabbitProducer)) ) {

            client.SendMessage(product);
        }
    }

    /// <summary>
    /// 将数据发送到队列,指定routingKey
    /// </summary>
    public void SendData4(Product product)
    {
        using( RabbitClient client = new RabbitClient(RabbitInit.SettingName, nameof(RabbitProducer)) ) {

            // 往 RabbitMQ 发送一条消息,routingKey由第3个参数指定
            client.SendMessage(product, null, RabbitInit.QueueName);
        }
    }

}



SendMessage方法签名如下:

/// <summary>
/// 往队列中发送一条消息。
/// </summary>
/// <param name="data">要发送的消息数据</param>
/// <param name="exchange">交换机名称,默认值: "amq.direct"</param>
/// <param name="routingKey">消息的路由键</param>
/// <param name="basicProperties"></param>
/// <returns>消息体长度</returns>
public int SendMessage(object data, string exchange = null, string routingKey = null, IBasicProperties basicProperties = null)



RabbitClient构造方法签名如下:

/// <summary>
/// 构造方法
/// </summary>
/// <param name="settingName">配置服务中的Rabbit连接名称</param>
/// <param name="connectionName">
/// 连接名称,
/// 如果不指定,表示连接在使用结束后关闭, 
/// 如果指定,那么连接将会一直打开,供后续同名的connectionName使用。</param>
public RabbitClient(string settingName, string connectionName = null)





订阅消息

可参考以下代码:

public class RabbitConsumer
{
    /// <summary>
    /// 订阅消息,【强类型】方式
    /// </summary>
    public static void Start1()
    {
        // 创建队列订阅者,并开启监听
        RabbitSubscriber.Start<Product, PrintMessageHandler>(new RabbitSubscriberArgs {
            SettingName = RabbitInit.SettingName,
            QueueName = null,  // 队列名称是 Product类型的全名
            SubscriberCount = LocalSettings.GetUInt("RabbitSubscriber.Count", 2)
        });
    }

    /// <summary>
    /// 订阅消息,【自由名称】方式
    /// </summary>
    public static void Start2()
    {
        // 创建队列订阅者,并开启监听
        RabbitSubscriber.Start<Product, PrintMessageHandler>(new RabbitSubscriberArgs {
            SettingName = RabbitInit.SettingName,
            QueueName = RabbitInit.QueueName,
            SubscriberCount = LocalSettings.GetUInt("RabbitSubscriber.Count", 2)
        });
    }
}





消息的延迟(定时)处理

实现思路:

  • 创建一个队列 WaitRetry,为它设置 死信队列 ReSend
  • 发送消息时指定过期时间,
  • 消息进入队列后,由于没有订阅者来消息,会在过期后进入死信队列
  • 订阅者订阅死信队列,获取到【延迟】的消息

注意:WaitRetry/ReSend 只是示例代码中的名称,并没有强制要求!

可参考下图:
xx

示例代码:

private static void CreateRetryMQ()
{
    //  “等待重试” 队列
    string waitRetryQueue = Res.GetWaitRetryQueue();

    // “重试” 队列
    string resendQueue = Res.GetReSendQueue();

    using( RabbitClient client = Res.CreateRabbitClient() ) {

        // 为 waitRetryQueue 配置 死信队列
        Dictionary<string, object> args = new Dictionary<string, object>();
        args["x-dead-letter-exchange"] = Exchanges.Direct;
        args["x-dead-letter-routing-key"] = resendQueue;
        
        client.CreateQueueBind(waitRetryQueue, null, null, args);                
        client.CreateQueueBind(resendQueue);
    }

    // 创建队列的消息订阅者
    RabbitSubscriber.Start<MessageX, ReSendHandler>(new RabbitSubscriberArgs {
        SettingName = Res.RabbitSettingName,
        QueueName = resendQueue,
    });
}

此时可以在RabbitMQ的管理界面看到:
xx


发送消息代码
public void SendRetryMessage(MessageX message, int waitMilliseconds)
{
    using( RabbitClient client = Res.CreateRabbitClient() ) {

        // 注意:消息发送的目标队列 是没有订阅者的,
		// 消息会一直等到 “过期” 被移到 “死信队列”,在哪里才会被处理。
        IBasicProperties properties = client.GetBasicProperties();
        properties.Expiration = waitMilliseconds.ToString();

        string routing = Res.GetWaitRetryQueue();
        client.SendMessage(message, null, routing, properties);
    }
}

剩下的事情就是订阅 ReSend 队列并处理消息,这里忽略这个过程……





多队列&多进程-自动配对订阅

回顾一下 简单的队列使用场景
假如我们有一个数据类型 XMessage
我们可以为它创建一个队列,名字也叫 XMessage,
然后我们可以用下面的代码来订阅这个队列:

RabbitSubscriberArgs args = new RabbitSubscriberArgs{
     SettingName = "Rabbit连接的配置名称",
     QueueName = "XMessage",
     SubscriberCount = 3
};
RabbitSubscriber.Start<XMessage, XMessageHandler>(args);

如果我们将程序运行2个实例,那么对于 XMessage 队列,它将有6个订阅者(2个实例 * 3个订阅线程),
所有消息将会均匀分配到6个订阅者。




多队列&多进程--使用场景
假设:XMessage类型的消息,由于种种原因需要分裂成多个“相同的”队列,
例如按租户ID取模,之后需要将同一个租户的所有消息固定发送到与它对应的队列。
此时进程(容器)运行多个实例,如何保证每个队列只被一个进程订阅(尤其要考虑进程重启)?


多队列场景示例
XMessage队列需要创建10个,分别是 XMessage1, XMessage2,,,,,XMessage10
它们仍然接收 XMessage 这个类型的消息,只是按租户ID将它们分开而已,
也就是说:

  • 如果XMessage消息中,tenantid = my596d74ca5eeb1,那么这条消息就投递到XMessage1队列,
  • 如果XMessage消息中,tenantid = my596d74ca5eeb2,那么这条消息就投递到XMessage2队列,
  • 如果XMessage消息中,tenantid = my596d74ca5eeb3,那么这条消息就投递到XMessage3队列,

此时我们的程序运行5个实例,那么订阅代码如何编写?
因为容器内的进程运行起来是一模一样的,不可能在代码或者配置中指定队列名称!


解决方案
  • 使用分布式锁,在锁范围内找到没有被订阅的队列,然后订阅它!

参考代码

/// <summary>
/// 多队列-多进程 自动配对订阅实现方案,
/// 假如有 10 个队列,5个进程,此时每个进程将订阅2个队列
/// </summary>
public static void Demo()
{
    // 此方案的大致思路是:
    // 1,先获取一个分布式锁,保证多个进程互斥(只有一个进程能进入执行)
    // 2,获取所有队列信息,检查将要订阅的队列是否有订阅者,如果队列没有订阅者,就订阅它


    // 创建一个分布式锁,锁将保证多个进程时,只有一个进程能进入执行
    using( GlobalLock locker = new GlobalLock("一个特殊的锁标识", TimeSpan.FromSeconds(30)) ) {

        RabbitMonitorClient client = new RabbitMonitorClient("rabbitmq-连接名称");

        // 获取所有队列信息,并过滤【特定】
        List<QueueInfo> list = client.GetQueues(10_000).Where(x => x.Name.StartsWith("队列名称前缀")).ToList();

        // 记录当前进程已订阅的队列数量
        int count = 0;

        foreach( var q in list ) {

            // 如果某个队列没有订阅者,就表示当前进程可以订阅它
            if( q.Consumers == 0 && count < 2 ) {

                // 订阅某个队列
                RabbitSubscriberArgs args = new RabbitSubscriberArgs {
                    SettingName = "Rabbit连接配置名称",
                    QueueName = q.Name,
                    SubscriberCount = 3
                };
                RabbitSubscriber.Start<XMessage, XMessageHanlder>(args);
                count++;
            }
        }
    }
}