消息处理--同步
消息处理的开发过程有2个步骤:
- 开发 消息处理器
- 开启消息订阅
消息处理器--同步版本
示例代码(骨架):
public class PrintMessageHandler : BaseMessageHandler<Product>
{
public override void ProcessMessage(PipelineContext<Product> context)
{
//...具体的实现逻辑...
}
}
开启消息订阅
注意:必须在程序初始时开户消息订阅
public static class AppInitializer
{
public static void Init()
{
// 在这里填写 消息订阅 代码,可参考下面示例
}
}
RabbitMQ版本
RabbitSubscriber.Start<Product, PrintMessageHandler>(new RabbitSubscriberArgs {
SettingName = "连接参数名称XXXXX",
QueueName = null, // 队列名称是 Product类型的全名
SubscriberCount = 2, // 订阅者数量
RetryCount = 5, // 重试次数
RetryWaitMilliseconds = 1000 // 重试的间隔时间
} );
Kafka版本
KafkaSubscriber.Start<Product, PrintMessageHandler>(new KafkaSubscriberArgs {
SettingName = "连接参数名称XXXXX",
GroupId = "消费组名称",
Topic = null, // Topic 是 Product类型的全名
SubscriberCount = 2, // 订阅者数量
RetryCount = 5, // 重试次数
RetryWaitMilliseconds = 1000 // 重试的间隔时间
} );
Redis版本
RedisSubscriber.Start<Product, PrintMessageHandler>(new RedisSubscriberArgs {
SettingName = "连接参数名称XXXXX",
Channel = null, // null 表示使用Product的类型全名代替
RetryCount = 5, // 重试次数
RetryWaitMilliseconds = 1000 // 重试的间隔时间
});
Pulsar版本
PulsarSubscriber.Start<Product, PrintMessageHandler>(new PulsarSubscriberArgs {
SettingName = "连接参数名称XXXXX",
Topic = "product",
SubscriberCount = 2, // 订阅者数量
RetryCount = 5, // 重试次数
RetryWaitMilliseconds = 1000 // 重试的间隔时间
});
MMQ版本
// 创建一个内存队列实例,消息的生产方可以给它发送消息。
private static readonly MemoryMesssageQueue<Product> s_mmq = new MemoryMesssageQueue<Product>(MmqWorkMode.Sync);
internal static void StartSubscriber()
{
MmqSubscriber.Start<Product, PrintMessageHandler>(new MmqSubscriberArgs<Product> {
Queue = s_mmq,
RetryCount = 0, // 重试次数
RetryWaitMilliseconds = 1000 // 重试的间隔时间
});
}
小结:
- XxxxSubscriber.Start方法的第一个类型参数是:消息的类型
- 第二个类型参数是:MessageHandler类型
- 也就是:将什么类型的消息交给什么类型的处理器来处理
- XxxxSubscriber.Start方法都只有一个参数,XxxSubscriberArgs(请按F12查看它的成员)
注意事项
- 队列服务在使用前必须先在配置服务中注册,SettingName是连接的名称
- SubscriberCount是指订阅者数量,这个值 不是 越大越好!
- 最终全部订阅者数量 = SubscriberCount * 进程数量