Kafka通讯(Confluent.Kafka版)
环境说明:
IDE:Visual Studio 2022
框架:.NET Framework 4.8
NuGet库:Confluent.Kafka 2.1.1

官方代码:
https://github.com/confluentinc/confluent-kafka-dotnet/
参考博客:
https://blog.csdn.net/weixin_30855761/article/details/96876699
https://blog.csdn.net/weixin_46867655/article/details/106998538
未解决问题:
1、无法直接获取到网络状态,但可以通过间接的方式获取到,比如:数据发送失败。
2、Message的value值只能为字符串类型,其他类型不行。
3、无法指定key值进行消费。
3、并发响应相对Kafka.Net较慢,博主这边体验有点差,不知道是不是网络不太好的原因!!
优点:
1、具备网络自动恢复,无需重复链接
2、可以设置GroupId
3、可以设置监控时间,已经获取监控日志
配置属性含义
属性含义 | 数据类型 | 备注 | 类型 |
BootstrapServers | string | kafka连接字符串地址 | 生产者 消费者 |
MessageTimeoutMs | int | 超时时间 | 生产者 |
GroupId | string | 组ID | 消费者 |
EnableAutoCommit | bool | true:自动确认 false:手动确认 | 消费者 |
EnableIdempotence | bool | 失败重试 true:重试 false:不重试 | 生产者 |
执行效果

代码:
using Confluent.Kafka;
using System;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace KafkaDemo
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
/// <summary>
/// 主题
/// </summary>
public string topic = "test";
public string server = "localhost:19092";
/// <summary>
/// 生产者
/// </summary>
public IProducer<string, string> producer;
private void Form1_Load(object sender, EventArgs e)
{
// 初始化生产者
var producerConfig = new ProducerConfig
{
// kafka连接字符串地址(生产者 消费者)
BootstrapServers = server,
// 超时时间
MessageTimeoutMs = 3000,
// 失败重试
EnableIdempotence = true
};
producer = new ProducerBuilder<string, string>(producerConfig).Build();
// 初始化消费者
var config = new ConsumerConfig
{
// // kafka连接字符串地址(生产者 消费者)
BootstrapServers = server,
// 组ID
GroupId = "GroupId",
// 自动确认
EnableAutoCommit = true,
// 监控周期
StatisticsIntervalMs = 10000,
// 客户端组会话和故障检测超时
// SessionTimeoutMs = 6000,
// 提取数据的偏移方式(按最早的)
AutoOffsetReset = AutoOffsetReset.Earliest,
// 数据采集完后触发
EnablePartitionEof = true
};
var consumer = new ConsumerBuilder<string, string>(config)
.SetErrorHandler((_, ex) =>
{
System.Diagnostics.Debug.WriteLine($"Error: {ex.Reason}");
})
.SetStatisticsHandler((_, json) =>
{
System.Diagnostics.Debug.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 监控");
})
.SetPartitionsAssignedHandler((c, partitions) =>
{
System.Diagnostics.Debug.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
System.Diagnostics.Debug.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
})
.Build();
consumer.Subscribe(topic);
// 消费者
new Task(() =>
{
while (true)
{
try
{
// 堵塞方式
var consumeResult = consumer.Consume();
if (consumeResult.IsPartitionEOF)
{
System.Diagnostics.Debug.WriteLine("触发EOF");
continue;
}
System.Diagnostics.Debug.WriteLine($"收到消息:{consumeResult.Message.Key}: {consumeResult.Message.Value}");
}
catch (ConsumeException ex)
{
System.Diagnostics.Debug.WriteLine($"接收错误: {ex.Error.Reason}");
}
}
}).Start();
}
private void button1_Click(object sender, EventArgs e)
{
new Task(() =>
{
try
{
var dr = producer.ProduceAsync(topic, new Message<string, string>
{
Key = "kafka_key",
Value = $"这是数据:{DateTime.Now.ToString("yyyy年MM月dd日 HH:mm:ss")}"
}).GetAwaiter().GetResult();
System.Diagnostics.Debug.WriteLine("------------消息发送成功!----------------");
}
catch (ProduceException<string, string> ex)
{
System.Diagnostics.Debug.WriteLine($"消息发送失败!;data:" + ex.Error.Reason);
}
}).Start();
}
}
}