消息处理器

无论使用哪种队列服务,所有消息最终是由 MessageHandler 来处理。

特性说明及要求:

  1. MessageHandler的实例随队列订阅者一直存在,直到进程退出。
  2. 一个消息订阅者对应一个MessageHandler,开启多个订阅者可实现并行处理效果。
  3. MessageHandler中所有数据成员在生命周期内持续可用,但由于实例隔离,会比静态成员更安全。
  4. MessageHandler的实例化由框架来完成,自行实例化是没有意义的。
  5. MessageHandler 不必关注消息是从哪里来,只需要处理它就可以了。
  6. MessageHandler 响应消息管道的各个阶段,可以将复杂的逻辑拆分,有助于代码结构清晰易懂。
  7. MessageHandler在执行时如果出现异常,默认会有异常处理,并且会启动重试机制,重新执行整个管道流程。
  8. 由于消息管道的重试机制,以及消息未及时提交等等原因,消息有可能被重复发送到MessageHandler来处的,因此MessageHandler要实现 【幂等操作】



示例代码

/// <summary>
/// 消息处理器。
/// 它不关心消息从哪里来,只管处理特定类型的消息即可。
/// </summary>
public class PrintMessageHandler : BaseMessageHandler<Product>
{
    public override void ProcessMessage(PipelineContext<Product> context)
    {
        // 获取消息对象
        Product message = context.MessageData;

        string json = $"ProductID={message.ProductID},,{message.ProductName},,,{message.Remark}";
        string text = $"{_count.ToString().PadLeft(3, ' ')}; {json}";

        Console2.Info(text);
    }
}

小结:

  • 必须从BaseMessageHandler<T>继承
  • 类型参数T就是消息的数据类型
  • 访问 context.MessageData 可获取消息对象



异常处理

BaseMessageHandler基类提供了3个与异常相关的方法用于定制行为(可以保持默认行为):

  • OnError:当消息处理过程中发生异常时会被调用。
    默认行为:不处理。
    说明:每条消息在处理过程中都会产生一条OprLog日志,如果有异常是会有记录的。
  • IsNeedRetry:如果异常没有被处理,这个方法用于判断当前消息是否需要重试。
    默认行为:会根据异常类别做出判断。
  • ProcessDeadMessage:在消息最终无法处理时调用。
    默认行为:将消息写入 temp\deadmsg 目录下,每个消息一个文件,即死消息文件。



死消息

消息管道默认有异常处理和重试机制,以下2种情况下消息会无法处理,形成死消息:

  • 重试一直失败,达到最大次数
  • 遇到的异常类型是不需要重试的异常,例如:ValidationException,Htt400

此时消息将被放弃重试,执行以下过程:

  • 将消息写入 .\temp\deadmsg 目录下,每个消息一个文件。
  • 向MQ服务端发送确认信号(表示消息已处理)
  • 获取下一条消息,并继续处理

死消息文件示例

xx