使用 Kafka

本文将介绍Kafka的使用技巧,主要包含以下内容

  1. 在配置服务中注册连接
  2. 发送消息
  3. 订阅消息
  4. 注意事项



在配置服务中注册连接

在使用 Kafka 前,请先在配置服务中注册连接参数,可参考下图:

xx



发送消息

示例代码:

[HttpPost]
[Route("send1.aspx")]
public int Send1(Product product)
{
    int index = Interlocked.Increment(ref s_index);
    product.Remark = $"Kafka message - {index}, {DateTime.Now.ToTimeString()}";

    using( KafkaClient client = new KafkaClient("Nebula.Log.Kafka") ) {
        return client.SendMessageNoWait(product);
    }
}



订阅消息

KafkaSubscriber.Start<Product, PrintMessageHandler>(new KafkaSubscriberArgs {
    SettingName = "Nebula.Log.Kafka",   // 连接的配置名称
    GroupId = null, // null 表示使用当前应用的名称 
    Topic = null,   // null 表示使用Product的类型全名代替
    CommitPeriod = 1,  // 自动提交周期,1 表示每处理1条消息就提交一次
    RetryCount = 0,    // 重试次数
    RetryWaitMilliseconds = 1000   // 重试的间隔时间
});

订阅消息之后,如果收到消息会交给 PrintMessageHandler 来处理,

后面的过程请参考:消息管道



注意事项

Kafka在整个消息处理过程中很有可能会导致消息丢失:

  1. KafkaClient.SendMessageNoWait 方法,它只是将消息放入客户端的缓冲区队列,并不是立即发送, 而是由另外的后台线程执行发送,这样做的好处是: 异步可提高吞吐量,但是在应用程序重启/停止的时候,极有可能出现消息丢失。
  2. 服务端在收到消息后,由于没有严格的事务保证,也是有可能导致消息丢失的
  3. 消息在服务端只是写入日志文件,并不关心客户端的处理进度(不是常规的队列概念),在消息达到清理时间后,即使消息没有被处理,也会被清除

对于订阅者(消费端)来说,请注意:

  1. KafkaSubscriber默认采用自动提交方式,可以设置 EnableAutoCommit = false 来使用【批量提交】
  2. 通常为了提高所谓的吞吐量,会采用【批量提交】方式,例如:CommitPeriod = 10 , 如果前面8条消息处理成功,处理第9条时程序重启, 当重新启动订阅后,前面的8条消息会重新发给订阅者,因为必须要实现【幂等操作】




参考链接