0

我想在 .NetCore 中实现多线程 Kafka 消费者,如下图所示。在java中,我们可以使用 Poll 方法检索多条记录,并且可以获取每个分区的记录。我想在.net 中实现同样的事情。

在此处输入图像描述

Java 实现。

while (!stopped.get()) {
     ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
     records.partitions().forEach(partition -> {
         List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
         Task task = new Task(partitionRecords);
         executor.submit(task);
         activeTasks.put(partition, task);
    });
}

我在 .net 核心中的消费者代码。

        using var consumer = new ConsumerBuilder<string, string>(
            configuration.AsEnumerable())
            .SetErrorHandler((_, eb) => Console.WriteLine($"Error: {eb.Reason}"))
            .SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}"))
            .SetPartitionsAssignedHandler((c, partitions) =>
            {
                Console.WriteLine($"Assigned Partitions: [{string.Join(", ", partitions)}]");
            })
            .SetPartitionsRevokedHandler((c, partitions) =>
            {
                Console.WriteLine($"Revoking Assignment: [{string.Join(", ", partitions)}]");
            })
            .Build();
        consumer.Subscribe(topicList);
        try
        {
            while (true)
            {
                var cr = consumer.Consume(cts.Token);
                Console.WriteLine($"Consumed event from topic {cr.Topic} with value {cr.Message.Value}. Offset: {cr.Offset}, Partition: {cr.Partition.Value}");
            }
        }
        catch (OperationCanceledException)
        {
            // Ctrl-C was pressed.
        }
        finally
        {
            consumer.Close();
        }
4

0 回答 0