Kafka通讯(Kafka-Net版)
环境说明:
IDE:Visual Studio 2022
框架:.NET Framework 4.8
NuGet库:kafka-net 0.9.0.65

官方代码:
https://github.com/Jroland/kafka-net
参考博客:
https://blog.csdn.net/weixin_30855761/article/details/96876699
https://blog.csdn.net/xinlingjun2007/article/details/80295332
未解决问题:
1、无法获取网络链接状态。
2、Message的value值只能为字符串类型,其他类型不行。
3、无法指定key值进行消费。
优点:
1、具备网络自动恢复,无需重复链接
2、操作简单
3、并发响应相对Confluent.Kafka较快
执行效果如下:

源码如下:
using KafkaNet;
using KafkaNet.Common;
using KafkaNet.Model;
using KafkaNet.Protocol;
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Windows.Forms;
using Message = KafkaNet.Protocol.Message;
namespace KafkaDemo
{
public partial class Form1 : Form
{
/// <summary>
/// 主题
/// </summary>
public string topic = "test";
/// <summary>
/// 链接
/// </summary>
private Producer client { get; set; }
public Form1()
{
InitializeComponent();
}
private void Form1_Load(object sender, EventArgs e)
{
// 创建Kafka客户端
var options = new KafkaOptions(new Uri("http://SERVER1:9092"), new Uri("http://SERVER2:9092"));
var router = new BrokerRouter(options);
client = new Producer(router);
// 创建消费者
new Task(() =>
{
// 这种方式无法获取带KEY的数据
//var consumer = new Consumer(new ConsumerOptions("test", router));
// 这种方式可以获取到带KEY的数据
var offsets = client.GetTopicOffsetAsync(topic).Result;
var consumer = new Consumer(new ConsumerOptions(topic, router),
offsets.Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray());
// 只获取key值为“kafka_key”的数据
foreach (var message in consumer.Consume().Where(x => x.Key.ToUtf8String() == "kafka_key"))
{
string value = message.Value.ToUtf8String();
string log = $"接收到数据:{message.Meta.PartitionId},{message.Meta.Offset},{message.Key.ToUtf8String()},{value}";
System.Diagnostics.Debug.WriteLine(log);
}
}).Start();
}
private void button1_Click(object sender, EventArgs e)
{
client.SendMessageAsync(topic, new[] { new Message("这是数据", "kafka_key") }).Wait(2000);
System.Diagnostics.Debug.WriteLine("发送结束");
}
}
}