使用 MMQ

MMQ 即 Memory Message Queue,它是一种进程内的消息队列,由ClownFish提供。

它有以下特点:

  • 性能好
  • 不做持久化



创建MMQ实例

示例代码如下:

/// <summary>
/// 同步版本的 MMQ
/// </summary>
private static readonly MemoryMesssageQueue<Product> s_syncMMQ 
                            = new MemoryMesssageQueue<Product>(MmqWorkMode.Sync);

/// <summary>
/// 异步版本的 MMQ
/// </summary>
private static readonly MemoryMesssageQueue<Product> s_asyncMMQ 
                            = new MemoryMesssageQueue<Product>(MmqWorkMode.Async);



发送消息

示例代码-同步版本:

[HttpPost]
[Route("send-sync.aspx")]
public int Send1(Product product)
{
    int index = Interlocked.Increment(ref s_index);
    product.Remark = $"MMQ message - {index}, {DateTime.Now.ToTimeString()}";

    s_syncMMQ.Write(product);   // 注意这行

    return index;
}

示例代码-异步版本:

[HttpPost]
[Route("send-async.aspx")]
public async Task<int> Send2(Product product)
{
    int index = Interlocked.Increment(ref s_index);
    product.Remark = $"MMQ message - {index}, {DateTime.Now.ToTimeString()}";

    await s_asyncMMQ.WriteAsync(product);   // 注意这行

    return index;
}



订阅消息

// 同步版本
MmqSubscriber.Start<Product, PrintMessageHandler>(new MmqSubscriberArgs<Product> {
    Queue = s_syncMMQ,
    SubscriberCount = 2,
    RetryCount = 0,    // 重试次数
    RetryWaitMilliseconds = 1000   // 重试的间隔时间
});

// 异步版本
MmqSubscriber.StartAsync<Product, PrintMessageHandler2>(new MmqSubscriberArgs<Product> {
    Queue = s_asyncMMQ,
    SubscriberCount = 2,
    RetryCount = 0,    // 重试次数
    RetryWaitMilliseconds = 1000   // 重试的间隔时间
});

订阅消息之后,如果收到消息会交给 PrintMessageHandler 来处理,

后面的过程请参考:消息管道