我正在使用 Debezium 对我的数据库执行 CDC 以在 Kafka 中创建消息。使用 kafdrop 和 OffsetExplorer 之类的工具,我可以看到消息key和value。但是,在使用 Confluent.Kafka 库的 .NET 框架应用程序中,当我使用消息时,消息键始终为空。
如何使用 Confluent.Kafka 库检索消息密钥?
这是VS项目的代码:
using System;
using Confluent.Kafka;
using System.Threading;
namespace KafkaConsumer
{
class Program
{
static void Main(string[] args)
{
var config = new ConsumerConfig
{
BootstrapServers = "kakfa:9092,localhost:9093",
GroupId = "foo",
AutoOffsetReset = AutoOffsetReset.Earliest,
};
var topics = "CAC_connector.dbo.sessionLogs";
bool cancelled = false;
// Define the cancellation token.
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe(topics);
while (!cancelled)
{
var cr = consumer.Consume(cancellationToken);
Console.WriteLine($"Consumed record with key {cr.Message.Key} and value {cr.Message.Value.Substring(0, 96)}");
}
consumer.Close();
}
}
}
}
干杯,凯莉