0

I'm trying to create a topic (an Event Hub) programmatically from the Kafka interface using AdminClient.CreateTopicsAsync. This works when connecting to Kafka, but not to Event Hub. I'm running into the following error:

Default partition count (KIP-464) not supported by broker, requires broker version <= 2.4.0

using Confluent.Kafka;
using Confluent.Kafka.Admin;

var adminClient = 
    new AdminClientBuilder(
        new[] {
            ("sasl.mechanism","PLAIN"),
            ("security.protocol","SASL_SSL"),
            ("bootstrap.servers", Address),
            ("sasl.username", "$ConnectionString"),
            ("sasl.password", ConnectionString),

        }.Select((kvp) => new KeyValuePair<string, string>(kvp.Item1, kvp.Item2))
    )
    .Build();

await adminClient.CreateTopicsAsync(new[] {
    new TopicSpecification {
        Name = "test-topic"
    }
});

It complains that using a default number of partitions is not supported, but as far as I can tell, I can't provide one as the underlying librdkafka does not support it. The only information I could find by googling this is that someone in 2021 did make it work.

4

1 回答 1

1

这段代码在 Kafka 和 EventHub 上都适用于我。

        using (var kafkaProducer = new ProducerBuilder<string, string>(producerConfig)
             .Build())
        {
            using (var adminClient = new DependentAdminClientBuilder(kafkaProducer.Handle).Build())
            {
                var metaData = adminClient.GetMetadata(TimeSpan.FromSeconds(5));
                var topicInfo = metaData.Topics.Where(tp => string.Equals(fullTopicName, tp.Topic, StringComparison.OrdinalIgnoreCase)).FirstOrDefault();

                if (topicInfo == null)
                {
                    var t = new Confluent.Kafka.Admin.TopicSpecification
                    {
                        Name = fullTopicName,
                        // at least 2 partitions
                        NumPartitions = kafkaTestConfig.CreateTopicOptions.NumPartitions, //  3, //1,
                        //// at least 1 replication factor
                        ReplicationFactor = kafkaTestConfig.CreateTopicOptions.ReplicationFactor, // 3, //(short)numberOfBrokers,

                        Configs = kafkaTestConfig.CreateTopicOptions.DynamicConfigs,
                    };
                    var o = new CreateTopicsOptions { OperationTimeout = TimeSpan.FromMilliseconds(_timeout), };
                    AssertES.True(adminClient.CreateTopicsAsync(new List<Confluent.Kafka.Admin.TopicSpecification> { t, }
                    , o).Wait(_timeout), "Failed to create topic in time: " + fullTopicName);
                }
            }
        }
于 2022-02-22T20:18:07.323 回答