使用 Pulsar
本文将介绍Pulsar的使用技巧,主要包含以下内容
- 在配置服务中注册连接
- 发送消息
- 订阅消息
在配置服务中注册连接
在使用 Pulsar 前,请先在配置服务中注册连接参数,可参考下图:
4个参数项的含意:
- ServiceUrl: 用发送和订阅消息,具体格式要求请参考DotPulsar的要求
- WebServiceUrl: REST的访问地址,如果不指定Venus将不监控它的可用性
- AuthToken:认证用的 JWT Token
- RetryIntervalMs: The time to wait before retrying an operation or a reconnect. 可以不指定!
发送消息
示例代码:
private async Task<string> Send0(Product product, string topic)
{
RuntimeData.PulsarCount.Increment();
int index = Interlocked.Increment(ref s_index);
product.Remark = $"Pulsar message - {index}, {DateTime.Now.ToTimeString()}";
await using( NPulsarClient client = new NPulsarClient("Pulsar_test", topic) ) {
var messageId = await client.SendMessageAsync(product);
return messageId.ToString();
}
}
订阅消息
PulsarSubscriber.StartAsync<Product, PrintMessageHandler2>(new PulsarSubscriberArgs {
SettingName = "Pulsar_test", // 连接的配置名称
Topic = "product",
SubscriberCount = 2, // 订阅者数量
RetryCount = 5, // 重试次数
RetryWaitMilliseconds = 1000 // 重试的间隔时间
});
订阅消息之后,如果收到消息会交给 PrintMessageHandler 来处理,
后面的过程请参考:消息管道
压缩解压缩
相关解释
- Pulsar服务端不处理压缩与解压缩
- 压缩由发送端实现,可选择压缩算法
- 解压缩由消费端在读取消息时判断,并自动调用对应的解压缩算法
Pulsar客户端支持4种压缩算法,可参考DotPulsar.CompressionType枚举,
相应的算法依赖包采用反射的方式加载,在运行时如果找不到合适的算法包会出现异常,
所以建议在程序部署时先添加必要的引用。
NPulsarClient默认使用Lz4压缩算法,如果需要使用其它算法,可在构造时指定:
/// <summary>
/// 构造方法
/// </summary>
/// <param name="settingName">配置服务中的Kafka连接名称</param>
/// <param name="topic">消息主题</param>
/// <param name="compressionType">压缩算法</param>
public NPulsarClient(string settingName, string topic, CompressionType compressionType = CompressionType.Lz4)
参考链接
-
Pulsar C# client: https://pulsar.apache.org/docs/zh-CN/client-libraries-dotnet/
-
Client Compression: https://github.com/apache/pulsar-dotpulsar/wiki/Compression