使用RabbitMQ
本文将介绍RabbitMQ的使用技巧,主要包含以下内容
- 在配置服务中注册连接
- 创建队列
- 发送消息
- 订阅消息
- 消息的延迟(定时)处理
- 多队列&多进程-自动配对订阅
参考链接
官方API参考: https://www.rabbitmq.com/dotnet-api-guide.html
官方教程: https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
在配置服务中注册连接
在Nebula中使用RabbitMQ必须先在配置服务中注册RabbitMQ连接,
具体做法中在settings表中增加一条记录(也可通过AdminUI界面来操作),例如:
字段说明
- name: 连接名称,在代码中使用
- value: 连接参数,可用数据成员参考下面RabbitOption代码
- restype=1: 用于开启Venus监控
/// <summary>
/// RabbitMQ的连接信息
/// </summary>
public sealed class RabbitOption
{
/// <summary>
/// VHost,默认值:"/"
/// </summary>
public string VHost { get; set; } = "/";
/// <summary>
/// RabbitMQ服务地址。【必填】
/// </summary>
public string Server { get; set; }
/// <summary>
/// 连接端口(大于 0 时有效)
/// </summary>
public int Port { get; set; }
/// <summary>
/// HTTP协议的连接端口(大于 0 时有效)
/// </summary>
public int HttpPort { get; set; }
/// <summary>
/// 登录名。【必填】
/// </summary>
public string Username { get; set; }
/// <summary>
/// 登录密码。
/// </summary>
public string Password { get; set; }
}
参数值配置示例
server=RabbitHost;username=fishli;password=aaaaaaaaaaaa;vhost=nbtest
创建队列
下面示例代码演示了2种创建队列的方法,
它们都使用了默认的 amq.direct 交换机。
public static class RabbitInit1
{
public static readonly string SettingName = "Rabbit-DEMO";
/// <summary>
/// 创建【强类型】的消息队列
/// </summary>
private static void CreateQueue1()
{
using( RabbitClient client = new RabbitClient(SettingName) ) {
// 队列名称,bindingKey 二者相同,都是 Product类型的全名
client.CreateQueueBind(typeof(Product));
}
}
}
public static class RabbitInit2
{
public static readonly string QueueName = "DEMO-QUEUE";
public static readonly string SettingName = "Rabbit-DEMO";
/// <summary>
/// 创建【自由名称】的消息队列
/// </summary>
private static void CreateQueue2()
{
using( RabbitClient client = new RabbitClient(SettingName) ) {
// 队列名称,bindingKey 二者相同,都是 "DEMO-QUEUE"
client.CreateQueueBind(QueueName);
}
}
}
CreateQueueBind方法签名如下:
/// <summary>
/// 创建队列并绑定到交换机
/// </summary>
/// <param name="queue">队列名称</param>
/// <param name="exchange">交换机名称,默认值: "amq.direct"</param>
/// <param name="bindingKey">从交换机到队列的映射标记</param>
/// <param name="argument">调用QueueDeclare时传递的argument参数</param>
public virtual void CreateQueueBind(string queue, string exchange = null, string bindingKey = null, IDictionary<string, object> argument = null)
/// <summary>
/// 创建队列并绑定到交换机
/// </summary>
/// <param name="dataType">消息的数据类型,最终创建的队列名称就是消息数据类型的全名,bindingKey与队列同名</param>
/// <param name="exchange">交换机名称,默认值: "amq.direct"</param>
/// <param name="argument">调用QueueDeclare时传递的argument参数</param>
public void CreateQueueBind(Type dataType, string exchange = null, IDictionary<string, object> argument = null)
发送消息
可参考以下代码:
internal class RabbitProducer : BaseController
{
/// <summary>
/// 将数据发送到队列,【强类型】方式
/// </summary>
public void SendData1(Product product) // 方法 1
{
// 如果只是 【偶尔】 需要发送消息,可以这样调用
using( RabbitClient client = new RabbitClient(RabbitInit.SettingName) ) {
// 往 RabbitMQ 发送一条消息,routingKey就是参数的类型全名
client.SendMessage(product);
}
}
public void SendData2(Product product) // 方法 2
{
// 往 RabbitMQ 发送一条消息,routingKey就是参数的类型全名
// 它会在整个应用程序内部维护一个共享连接,在多次调用时性能会更好。
this.SendRabbitMessage(RabbitInit.SettingName, product);
}
public void SendData3(Product product) // 方法 3
{
// RabbitClient构造方法的第二个参数是一个 “共享连接” 的名称,
// 默认值是null,表示连接仅在 RabbitClient 内部使用,RabbitClient离开作用域后将关闭连接。
// 如果指定了连接名称,那么连接将会保持,在下次构造RabbitClient时将会重用已存在的连接。
// 共享连接的名称建议使用 nameof(类型名称) ,表示连接仅供某个类型共用。
using( RabbitClient client = new RabbitClient(RabbitInit.SettingName, nameof(RabbitProducer)) ) {
client.SendMessage(product);
}
}
/// <summary>
/// 将数据发送到队列,指定routingKey
/// </summary>
public void SendData4(Product product)
{
using( RabbitClient client = new RabbitClient(RabbitInit.SettingName, nameof(RabbitProducer)) ) {
// 往 RabbitMQ 发送一条消息,routingKey由第3个参数指定
client.SendMessage(product, null, RabbitInit.QueueName);
}
}
}
SendMessage方法签名如下:
/// <summary>
/// 往队列中发送一条消息。
/// </summary>
/// <param name="data">要发送的消息数据</param>
/// <param name="exchange">交换机名称,默认值: "amq.direct"</param>
/// <param name="routingKey">消息的路由键</param>
/// <param name="basicProperties"></param>
/// <returns>消息体长度</returns>
public int SendMessage(object data, string exchange = null, string routingKey = null, IBasicProperties basicProperties = null)
RabbitClient构造方法签名如下:
/// <summary>
/// 构造方法
/// </summary>
/// <param name="settingName">配置服务中的Rabbit连接名称</param>
/// <param name="connectionName">
/// 连接名称,
/// 如果不指定,表示连接在使用结束后关闭,
/// 如果指定,那么连接将会一直打开,供后续同名的connectionName使用。</param>
public RabbitClient(string settingName, string connectionName = null)
订阅消息
可参考以下代码:
public class RabbitConsumer
{
/// <summary>
/// 订阅消息,【强类型】方式
/// </summary>
public static void Start1()
{
// 创建队列订阅者,并开启监听
RabbitSubscriber.Start<Product, PrintMessageHandler>(new RabbitSubscriberArgs {
SettingName = RabbitInit.SettingName,
QueueName = null, // 队列名称是 Product类型的全名
SubscriberCount = LocalSettings.GetUInt("RabbitSubscriber.Count", 2)
});
}
/// <summary>
/// 订阅消息,【自由名称】方式
/// </summary>
public static void Start2()
{
// 创建队列订阅者,并开启监听
RabbitSubscriber.Start<Product, PrintMessageHandler>(new RabbitSubscriberArgs {
SettingName = RabbitInit.SettingName,
QueueName = RabbitInit.QueueName,
SubscriberCount = LocalSettings.GetUInt("RabbitSubscriber.Count", 2)
});
}
}
消息的延迟(定时)处理
实现思路:
- 创建一个队列 WaitRetry,为它设置 死信队列 ReSend
- 发送消息时指定过期时间,
- 消息进入队列后,由于没有订阅者来消息,会在过期后进入死信队列
- 订阅者订阅死信队列,获取到【延迟】的消息
注意:WaitRetry/ReSend 只是示例代码中的名称,并没有强制要求!
可参考下图:
示例代码:
private static void CreateRetryMQ()
{
// “等待重试” 队列
string waitRetryQueue = Res.GetWaitRetryQueue();
// “重试” 队列
string resendQueue = Res.GetReSendQueue();
using( RabbitClient client = Res.CreateRabbitClient() ) {
// 为 waitRetryQueue 配置 死信队列
Dictionary<string, object> args = new Dictionary<string, object>();
args["x-dead-letter-exchange"] = Exchanges.Direct;
args["x-dead-letter-routing-key"] = resendQueue;
client.CreateQueueBind(waitRetryQueue, null, null, args);
client.CreateQueueBind(resendQueue);
}
// 创建队列的消息订阅者
RabbitSubscriber.Start<MessageX, ReSendHandler>(new RabbitSubscriberArgs {
SettingName = Res.RabbitSettingName,
QueueName = resendQueue,
});
}
此时可以在RabbitMQ的管理界面看到:
发送消息代码
public void SendRetryMessage(MessageX message, int waitMilliseconds)
{
using( RabbitClient client = Res.CreateRabbitClient() ) {
// 注意:消息发送的目标队列 是没有订阅者的,
// 消息会一直等到 “过期” 被移到 “死信队列”,在哪里才会被处理。
IBasicProperties properties = client.GetBasicProperties();
properties.Expiration = waitMilliseconds.ToString();
string routing = Res.GetWaitRetryQueue();
client.SendMessage(message, null, routing, properties);
}
}
剩下的事情就是订阅 ReSend 队列并处理消息,这里忽略这个过程……
多队列&多进程-自动配对订阅
回顾一下 简单的队列使用场景
假如我们有一个数据类型 XMessage
我们可以为它创建一个队列,名字也叫 XMessage,
然后我们可以用下面的代码来订阅这个队列:
RabbitSubscriberArgs args = new RabbitSubscriberArgs{
SettingName = "Rabbit连接的配置名称",
QueueName = "XMessage",
SubscriberCount = 3
};
RabbitSubscriber.Start<XMessage, XMessageHandler>(args);
如果我们将程序运行2个实例,那么对于 XMessage 队列,它将有6个订阅者(2个实例 * 3个订阅线程),
多队列&多进程--使用场景
假设:XMessage类型的消息,由于种种原因需要分裂成多个“相同的”队列,
例如按租户ID取模,之后需要将同一个租户的所有消息固定发送到与它对应的队列。
此时进程(容器)运行多个实例,如何保证每个队列只被一个进程订阅(尤其要考虑进程重启)?
多队列场景示例
XMessage队列需要创建10个,分别是 XMessage1, XMessage2,,,,,XMessage10
它们仍然接收 XMessage 这个类型的消息,只是按租户ID将它们分开而已,
也就是说:
- 如果XMessage消息中,tenantid = my596d74ca5eeb1,那么这条消息就投递到XMessage1队列,
- 如果XMessage消息中,tenantid = my596d74ca5eeb2,那么这条消息就投递到XMessage2队列,
- 如果XMessage消息中,tenantid = my596d74ca5eeb3,那么这条消息就投递到XMessage3队列,
此时我们的程序运行5个实例,那么订阅代码如何编写?
因为容器内的进程运行起来是一模一样的,不可能在代码或者配置中指定队列名称!
解决方案
- 使用分布式锁,在锁范围内找到没有被订阅的队列,然后订阅它!
参考代码
/// <summary>
/// 多队列-多进程 自动配对订阅实现方案,
/// 假如有 10 个队列,5个进程,此时每个进程将订阅2个队列
/// </summary>
public static void Demo()
{
// 此方案的大致思路是:
// 1,先获取一个分布式锁,保证多个进程互斥(只有一个进程能进入执行)
// 2,获取所有队列信息,检查将要订阅的队列是否有订阅者,如果队列没有订阅者,就订阅它
// 创建一个分布式锁,锁将保证多个进程时,只有一个进程能进入执行
using( GlobalLock locker = new GlobalLock("一个特殊的锁标识", TimeSpan.FromSeconds(30)) ) {
RabbitMonitorClient client = new RabbitMonitorClient("rabbitmq-连接名称");
// 获取所有队列信息,并过滤【特定】
List<QueueInfo> list = client.GetQueues(10_000).Where(x => x.Name.StartsWith("队列名称前缀")).ToList();
// 记录当前进程已订阅的队列数量
int count = 0;
foreach( var q in list ) {
// 如果某个队列没有订阅者,就表示当前进程可以订阅它
if( q.Consumers == 0 && count < 2 ) {
// 订阅某个队列
RabbitSubscriberArgs args = new RabbitSubscriberArgs {
SettingName = "Rabbit连接配置名称",
QueueName = q.Name,
SubscriberCount = 3
};
RabbitSubscriber.Start<XMessage, XMessageHanlder>(args);
count++;
}
}
}
}