消息处理--异步

消息处理的开发过程有2个步骤:



消息处理器--异步版本

示例代码(骨架):

public class PrintMessageHandler : AsyncBaseMessageHandler<Product>
{
    public override Task ProcessMessage(PipelineContext<Product> context)
    {
        //...具体的实现逻辑...
    }
}





启动消息订阅

注意:必须在程序初始时开户消息订阅

public static class AppInitializer
{
    public static void Init()
    {
        // 在这里填写 消息订阅 代码,可参考下面示例
    }
}

RabbitMQ版本

RabbitSubscriber.StartAsync<Product, PrintMessageHandler>(new RabbitSubscriberArgs {
    // 所有参数与同步版本一致,这里省略
} );

Kafka版本

KafkaSubscriber.StartAsync<Product, PrintMessageHandler>(new KafkaSubscriberArgs {
    // 所有参数与同步版本一致,这里省略
} );

Redis版本

RedisSubscriber.StartAsync<Product, PrintMessageHandler>(new RedisSubscriberArgs {
    // 所有参数与同步版本一致,这里省略
});

Pulsar版本

PulsarSubscriber.StartAsync<Product, PrintMessageHandler2>(new PulsarSubscriberArgs {
    // 所有参数与同步版本一致,这里省略
});

MMQ版本

// 创建一个支持异步操作的内存队列实例,消息的生产方可以给它发送消息。
private static readonly MemoryMesssageQueue<Product> s_mmq = new MemoryMesssageQueue<Product>(MmqWorkMode.Async);

internal static void StartSubscriber()
{
    MmqSubscriber.StartAsync<Product, PrintMessageHandler>(new MmqSubscriberArgs<Product> {
        // 所有参数与同步版本一致,这里省略
    });
}



小结:

  • XxxxSubscriber.StartAsync方法的第一个类型参数是:消息的类型
  • 第二个类型参数是:MessageHandler类型
  • 也就是:将什么类型的消息交给什么类型的处理器来处理
  • XxxxSubscriber.StartAsync方法都只有一个参数,XxxSubscriberArgs(请按F12查看它的成员)

注意事项

  • 队列服务在使用前必须先在配置服务中注册,SettingName是连接的名称
  • SubscriberCount是指订阅者数量,这个值 不是 越大越好!
  • 最终全部订阅者数量 = SubscriberCount * 进程数量