使用 Pulsar

本文将介绍Pulsar的使用技巧,主要包含以下内容

  1. 在配置服务中注册连接
  2. 发送消息
  3. 订阅消息



在配置服务中注册连接

在使用 Pulsar 前,请先在配置服务中注册连接参数,可参考下图:

xx

4个参数项的含意:

  • ServiceUrl: 用发送和订阅消息,具体格式要求请参考DotPulsar的要求
  • WebServiceUrl: REST的访问地址,如果不指定Venus将不监控它的可用性
  • AuthToken:认证用的 JWT Token
  • RetryIntervalMs: The time to wait before retrying an operation or a reconnect. 可以不指定!



发送消息

示例代码:

private async Task<string> Send0(Product product, string topic)
{
    RuntimeData.PulsarCount.Increment();

    int index = Interlocked.Increment(ref s_index);
    product.Remark = $"Pulsar message - {index}, {DateTime.Now.ToTimeString()}";

    await using( NPulsarClient client = new NPulsarClient("Pulsar_test", topic) ) {
        var messageId = await client.SendMessageAsync(product);
        return messageId.ToString();
    }
}



订阅消息

PulsarSubscriber.StartAsync<Product, PrintMessageHandler2>(new PulsarSubscriberArgs {
    SettingName = "Pulsar_test",   // 连接的配置名称
    Topic = "product",
    SubscriberCount = 2,  // 订阅者数量
    RetryCount = 5,    // 重试次数
    RetryWaitMilliseconds = 1000   // 重试的间隔时间
});

订阅消息之后,如果收到消息会交给 PrintMessageHandler 来处理,

后面的过程请参考:消息管道




压缩解压缩

相关解释

  • Pulsar服务端不处理压缩与解压缩
  • 压缩由发送端实现,可选择压缩算法
  • 解压缩由消费端在读取消息时判断,并自动调用对应的解压缩算法

Pulsar客户端支持4种压缩算法,可参考DotPulsar.CompressionType枚举,
相应的算法依赖包采用反射的方式加载,在运行时如果找不到合适的算法包会出现异常,
所以建议在程序部署时先添加必要的引用。

NPulsarClient默认使用Lz4压缩算法,如果需要使用其它算法,可在构造时指定:

/// <summary>
/// 构造方法
/// </summary>
/// <param name="settingName">配置服务中的Kafka连接名称</param>
/// <param name="topic">消息主题</param>
/// <param name="compressionType">压缩算法</param>
public NPulsarClient(string settingName, string topic, CompressionType compressionType = CompressionType.Lz4)




参考链接