消息处理--同步

消息处理的开发过程有2个步骤:



消息处理器--同步版本

示例代码(骨架):

public class PrintMessageHandler : BaseMessageHandler<Product>
{
    public override void ProcessMessage(PipelineContext<Product> context)
    {
        //...具体的实现逻辑...
    }
}





开启消息订阅

注意:必须在程序初始时开户消息订阅

public static class AppInitializer
{
    public static void Init()
    {
        // 在这里填写 消息订阅 代码,可参考下面示例
    }
}

RabbitMQ版本

RabbitSubscriber.Start<Product, PrintMessageHandler>(new RabbitSubscriberArgs {
    SettingName = "连接参数名称XXXXX",
    QueueName = null,  // 队列名称是 Product类型的全名
    SubscriberCount = 2,  // 订阅者数量
    RetryCount = 5,    // 重试次数
    RetryWaitMilliseconds = 1000   // 重试的间隔时间
} );

Kafka版本

KafkaSubscriber.Start<Product, PrintMessageHandler>(new KafkaSubscriberArgs {
    SettingName = "连接参数名称XXXXX",
    GroupId = "消费组名称",
    Topic = null, // Topic 是 Product类型的全名
    SubscriberCount = 2,  // 订阅者数量
    RetryCount = 5,    // 重试次数
    RetryWaitMilliseconds = 1000   // 重试的间隔时间
} );

Redis版本

RedisSubscriber.Start<Product, PrintMessageHandler>(new RedisSubscriberArgs {
    SettingName = "连接参数名称XXXXX",
    Channel = null,   // null 表示使用Product的类型全名代替
    RetryCount = 5,    // 重试次数
    RetryWaitMilliseconds = 1000   // 重试的间隔时间
});

Pulsar版本

PulsarSubscriber.Start<Product, PrintMessageHandler>(new PulsarSubscriberArgs {
    SettingName = "连接参数名称XXXXX",
    Topic = "product",
    SubscriberCount = 2,  // 订阅者数量
    RetryCount = 5,    // 重试次数
    RetryWaitMilliseconds = 1000   // 重试的间隔时间
});

MMQ版本

// 创建一个内存队列实例,消息的生产方可以给它发送消息。
private static readonly MemoryMesssageQueue<Product> s_mmq = new MemoryMesssageQueue<Product>(MmqWorkMode.Sync);

internal static void StartSubscriber()
{
    MmqSubscriber.Start<Product, PrintMessageHandler>(new MmqSubscriberArgs<Product> {
        Queue = s_mmq,
        RetryCount = 0,    // 重试次数
        RetryWaitMilliseconds = 1000   // 重试的间隔时间
    });
}

小结:

  • XxxxSubscriber.Start方法的第一个类型参数是:消息的类型
  • 第二个类型参数是:MessageHandler类型
  • 也就是:将什么类型的消息交给什么类型的处理器来处理
  • XxxxSubscriber.Start方法都只有一个参数,XxxSubscriberArgs(请按F12查看它的成员)

注意事项

  • 队列服务在使用前必须先在配置服务中注册,SettingName是连接的名称
  • SubscriberCount是指订阅者数量,这个值 不是 越大越好!
  • 最终全部订阅者数量 = SubscriberCount * 进程数量



不要硬编码 SubscriberCount

注意:SubscriberCount 这个参数,不要硬编码在代码中,应该使用一个参数来定义,例如:

RabbitSubscriber.Start<string, InvokeLogHandler3>(new RabbitSubscriberArgs {
    SettingName = ResNames.Rabbit,
    QueueName = typeof(InvokeLog).GetQueueName(),
    SubscriberCount = LocalSettings.GetUInt("Mercury_InvokeLog_RabbitSubscriber_Count", 3)
});



SubscriberCount 不应该太大

虽然增加 SubscriberCount 可以提高消息的吞吐量,但是订阅者也是会消耗资源的。

以RabbitMQ为例,每个订阅者都会创建一个TCP连接到RabbitMQ,而且还会有多个后台线程,因此它是一个比较消耗资源的对象,太多的连接也会给RabbitMQ制造一定的压力,所以建议这个参数不要太多,例如:对于一个队列来说,10个就够大了。

如果希望大幅提高吞吐量,可以参考下面示例:

private static void StartSubscribers()
{
    RabbitSubscriberArgs args = new RabbitSubscriberArgs
    {
        SettingName = RabbitSettings.DataBus_Rabbit,
        QueueName = "databus.request.new",
        SubscriberCount = 1,    // 只需要 1 个消息订阅者
    };

    RabbitSubscriber.StartAsync<NHttpRequest, HttpRequestMessageHandler>(args);
}


public class HttpRequestMessageHandler : AsyncBaseMessageHandler<NHttpRequest>
{
    private static readonly SemaphoreSlim s_semaphore;

    public override bool EnableLog => false;  // 不使用 MessageHandler 的日志

    static HttpRequestMessageHandler()
    {
        int concurrentCount = LocalSettings.GetUInt("DatabusMH_ConcurrentCount", 50);  // 控制并发量的参数
        s_semaphore = new SemaphoreSlim(concurrentCount);

        Console2.Info($"DatabusMH_ConcurrentCount = {concurrentCount}");
    }

    public override async Task ProcessMessage(PipelineContext<NHttpRequest> context)
    {
        NHttpRequest request = context.MessageData;

        await s_semaphore.WaitAsync();   // 这里控制并发

        // 异步执行发送请求的操作,当前方法中 【不等待】
        _ = ThreadUtils.RunAsync("HttpRequestMessageHandler_ProcessMessage", async () => {

            try {
                await SendRquest(request);
            }
            finally {
                s_semaphore.Release();
            }
        });
    }

    private static readonly bool s_logErrorRequest = LocalSettings.GetBool("DatabusMH_LogErrorRequest");

    private static async Task SendRquest(NHttpRequest request)
    {
        // 开启日志
        using( CodeSnippetContext ctx = new CodeSnippetContext(typeof(HttpRequestMessageHandler), "SendRequest", s_messageHandlerPerformance) ) {
            ctx.SetAsLongTask();

            try {
                int result = await SendRquest0(request, ctx.OprLog);

                // 如果非正常结束,就把请求记录下来
                if( s_logErrorRequest && result == 1 && ctx.OprLog.Request.IsNullOrEmpty() ) {
                    ctx.OprLog.Request = GetLogText(request);
                }
            }
            catch( Exception ex ) {
                ctx.SetException(ex);
            }
        }
    }

    // ...... 省略一些无关代码
}

示例小结:

  • 采用 SemaphoreSlim 来控制并发数量,而不是增加消息订阅者数量
  • 每条消息采用.NET线程池异步处理
  • 由于HttpRequestMessageHandler.ProcessMessage 并没有实际处理消息,因此设置 EnableLog => false;
  • 在消息处理代码中,using( CodeSnippetContext ctx = ...), 可记录消息的处理过程