0

我正在使用 Debezium 对我的数据库执行 CDC 以在 Kafka 中创建消息。使用 kafdrop 和 OffsetExplorer 之类的工具,我可以看到消息keyvalue。但是,在使用 Confluent.Kafka 库的 .NET 框架应用程序中,当我使用消息时,消息始终为空。

如何使用 Confluent.Kafka 库检索消息密钥?

这是VS项目的代码:

using System;
using Confluent.Kafka;
using System.Threading;

namespace KafkaConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = "kakfa:9092,localhost:9093",
                GroupId = "foo",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                
            };

            var topics = "CAC_connector.dbo.sessionLogs";
            bool cancelled = false;

            // Define the cancellation token.
            CancellationTokenSource source = new CancellationTokenSource();
            CancellationToken cancellationToken = source.Token;

            using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                consumer.Subscribe(topics);

                while (!cancelled)
                {
                    var cr = consumer.Consume(cancellationToken);
                    Console.WriteLine($"Consumed record with key {cr.Message.Key} and value {cr.Message.Value.Substring(0, 96)}");
                }

                consumer.Close();
            }
        }
    }
}

干杯,凯莉

4

1 回答 1

1

正如 OneCricketeer 指出的那样,我最初是在构建 ConsumerBuilder 以忽略密钥。当我将构造函数更改为ConsumerBuilder<string, string>消息中包含的键时。

于 2021-07-21T17:33:32.853 回答