使用 Kafka
本文将介绍Kafka的使用技巧,主要包含以下内容
- 在配置服务中注册连接
- 发送消息
- 订阅消息
- 注意事项
在配置服务中注册连接
在使用 Kafka 前,请先在配置服务中注册连接参数,可参考下图:
发送消息
示例代码:
[HttpPost]
[Route("send1.aspx")]
public int Send1(Product product)
{
int index = Interlocked.Increment(ref s_index);
product.Remark = $"Kafka message - {index}, {DateTime.Now.ToTimeString()}";
using( KafkaClient client = new KafkaClient("Nebula.Log.Kafka") ) {
return client.SendMessageNoWait(product);
}
}
订阅消息
KafkaSubscriber.Start<Product, PrintMessageHandler>(new KafkaSubscriberArgs {
SettingName = "Nebula.Log.Kafka", // 连接的配置名称
GroupId = null, // null 表示使用当前应用的名称
Topic = null, // null 表示使用Product的类型全名代替
CommitPeriod = 1, // 自动提交周期,1 表示每处理1条消息就提交一次
RetryCount = 0, // 重试次数
RetryWaitMilliseconds = 1000 // 重试的间隔时间
});
订阅消息之后,如果收到消息会交给 PrintMessageHandler 来处理,
后面的过程请参考:消息管道
注意事项
Kafka在整个消息处理过程中很有可能会导致消息丢失:
- KafkaClient.SendMessageNoWait 方法,它只是将消息放入客户端的缓冲区队列,并不是立即发送, 而是由另外的后台线程执行发送,这样做的好处是: 异步可提高吞吐量,但是在应用程序重启/停止的时候,极有可能出现消息丢失。
- 服务端在收到消息后,由于没有严格的事务保证,也是有可能导致消息丢失的
- 消息在服务端只是写入日志文件,并不关心客户端的处理进度(不是常规的队列概念),在消息达到清理时间后,即使消息没有被处理,也会被清除
对于订阅者(消费端)来说,请注意:
- KafkaSubscriber默认采用自动提交方式,可以设置 EnableAutoCommit = false 来使用【批量提交】
- 通常为了提高所谓的吞吐量,会采用【批量提交】方式,例如:CommitPeriod = 10 , 如果前面8条消息处理成功,处理第9条时程序重启, 当重新启动订阅后,前面的8条消息会重新发给订阅者,因为必须要实现【幂等操作】