我想在 .NetCore 中实现多线程 Kafka 消费者,如下图所示。在java中,我们可以使用 Poll 方法检索多条记录,并且可以获取每个分区的记录。我想在.net 中实现同样的事情。
Java 实现。
while (!stopped.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
records.partitions().forEach(partition -> {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
Task task = new Task(partitionRecords);
executor.submit(task);
activeTasks.put(partition, task);
});
}
我在 .net 核心中的消费者代码。
using var consumer = new ConsumerBuilder<string, string>(
configuration.AsEnumerable())
.SetErrorHandler((_, eb) => Console.WriteLine($"Error: {eb.Reason}"))
.SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}"))
.SetPartitionsAssignedHandler((c, partitions) =>
{
Console.WriteLine($"Assigned Partitions: [{string.Join(", ", partitions)}]");
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
Console.WriteLine($"Revoking Assignment: [{string.Join(", ", partitions)}]");
})
.Build();
consumer.Subscribe(topicList);
try
{
while (true)
{
var cr = consumer.Consume(cts.Token);
Console.WriteLine($"Consumed event from topic {cr.Topic} with value {cr.Message.Value}. Offset: {cr.Offset}, Partition: {cr.Partition.Value}");
}
}
catch (OperationCanceledException)
{
// Ctrl-C was pressed.
}
finally
{
consumer.Close();
}