消息处理器
无论使用哪种队列服务,所有消息最终是由 MessageHandler 来处理。
特性说明及要求:
- MessageHandler的实例随队列订阅者一直存在,直到进程退出。
- 一个消息订阅者对应一个MessageHandler,开启多个订阅者可实现并行处理效果。
- MessageHandler中所有数据成员在生命周期内持续可用,但由于实例隔离,会比静态成员更安全。
- MessageHandler的实例化由框架来完成,自行实例化是没有意义的。
- MessageHandler 不必关注消息是从哪里来,只需要处理它就可以了。
- MessageHandler 响应消息管道的各个阶段,可以将复杂的逻辑拆分,有助于代码结构清晰易懂。
- MessageHandler在执行时如果出现异常,默认会有异常处理,并且会启动重试机制,重新执行整个管道流程。
- 由于消息管道的重试机制,以及消息未及时提交等等原因,消息有可能被重复发送到MessageHandler来处的,因此MessageHandler要实现 【幂等操作】
示例代码
/// <summary>
/// 消息处理器。
/// 它不关心消息从哪里来,只管处理特定类型的消息即可。
/// </summary>
public class PrintMessageHandler : BaseMessageHandler<Product>
{
public override void ProcessMessage(PipelineContext<Product> context)
{
// 获取消息对象
Product message = context.MessageData;
string json = $"ProductID={message.ProductID},,{message.ProductName},,,{message.Remark}";
string text = $"{_count.ToString().PadLeft(3, ' ')}; {json}";
Console2.Info(text);
}
}
小结:
- 必须从BaseMessageHandler<T>继承
- 类型参数T就是消息的数据类型
- 访问 context.MessageData 可获取消息对象
异常处理
BaseMessageHandler基类提供了3个与异常相关的方法用于定制行为(可以保持默认行为):
- OnError:当消息处理过程中发生异常时会被调用。
默认行为:不处理。
说明:每条消息在处理过程中都会产生一条OprLog日志,如果有异常是会有记录的。 - IsNeedRetry:如果异常没有被处理,这个方法用于判断当前消息是否需要重试。
默认行为:会根据异常类别做出判断。 - ProcessDeadMessage:在消息最终无法处理时调用。
默认行为:将消息写入 temp\deadmsg 目录下,每个消息一个文件,即死消息文件。
死消息
消息管道默认有异常处理和重试机制,以下2种情况下消息会无法处理,形成死消息:
- 重试一直失败,达到最大次数
- 遇到的异常类型是不需要重试的异常,例如:ValidationException,Htt400
此时消息将被放弃重试,执行以下过程:
- 将消息写入 .\temp\deadmsg 目录下,每个消息一个文件。
- 向MQ服务端发送确认信号(表示消息已处理)
- 获取下一条消息,并继续处理
死消息文件示例