0

如何一次在生产者中编写消息并与消费者每分钟读取 1 条消息?

我可以使用的配置属性

在此处输入图像描述

注意:请“max.poll.records”注意我不能使用该方法

我的消费阶层:

var settings = ConfigurationManager.KafkaSettings.Topics[Topics.FaturaKaydetViaTp];

LogManager.Logger.Debug("Consumer initiating for {topic}", settings.TopicName);

using (var consumer = new ConsumerBuilder<Ignore, MailMessage>(consumerConfig).SetValueDeserializer(new ObjectDeserializer<MailMessage>()).Build())
{
    LogManager.Logger.Debug("Consumer initiated");

    LogManager.Logger.Debug("Subscribing for {topic}", settings.TopicName);
    
    consumer.Subscribe(settings.TopicName);

    try
    {
        while (true)
        {
            try
            {
                
                var cr = consumer.Consume();

                LogManager.Logger.Debug("Message received for '{topic}' at: '{topicPartitionOffset}'.", settings.TopicName, cr.TopicPartitionOffset);
                if (HandleOnMessage(cr.Value))
                    if (ConfigurationManager.KafkaSettings.AutoCommit == false)
                        consumer.Commit(cr);
            }
            catch (ConsumeException e)
            {
                LogManager.Logger.Fatal(e, "ConsumeException");
            }
        }
    }
    catch (OperationCanceledException)
    {
        // Ensure the consumer leaves the group cleanly and final offsets are committed.
        consumer.Close();
    }
}
4

3 回答 3

2

一般策略需要您暂停消费者,让运行循环的线程休眠,然后重新订阅/恢复消费者。

这需要从设置开始PartitionsAssignedHandlerConsumerBuilder然后保存返回的分区分配,因为Consumer.Pause()andConsumer.Resume()方法需要这些。

IEnumerable<TopicPartition> partitions; // TODO: assign this from handler
using (var consumer = ... ) { // set PartitionsAssignedHandler in ConsumerBuilder and set partitions above. 

        consumer.Subscribe(settings.TopicName);
        while (true)
        {
            try
            {
                // TODO: Might need to check if currently paused, somehow
                List<TopicPartitionError> resumeErrors = consumer.Resume(partitons);
                // TODO: handle resume errors

                var cr = consumer.Consume(); // This already gets one record from the assigned, resumed partitions

                LogManager.Logger.Debug("Message received for '{topic}' at: '{topicPartitionOffset}'.", cr.Topic, cr.TopicPartitionOffset);

                // Pause and sleep
                consumer.Pause(partitons);
                Thread.Sleep(1000 * 60); // 1 minute
            }
            catch (ConsumeException e)
            {
                LogManager.Logger.Fatal(e, "ConsumeException");
            }
        }

请记住,如果您运行多个实例,在暂停和恢复时总是会重新平衡,这会导致处理显着延迟,可能导致每分钟消耗超过一个。

于 2022-01-07T16:43:41.887 回答
1

首先,您需要设置max.poll.records1,否则您将获取每个consumer.poll(Duration).

Duration传递给consumer.poll(…)不强制等待,它的工作方式不同。从文档:

如果有可用记录,此方法会立即返回。否则,它将等待通过的超时。如果超时,将返回一个空记录集

要每分钟获取一个,您需要在短时间内轮询,而不是等待 60 秒(不准确),或者使用其他一些工具以 60 秒的间隔进行轮询。

于 2022-01-07T11:54:01.213 回答
0

根据您使用的 API,consumer.poll() 方法接受持续时间(以毫秒为单位)的参数。

Java中的示例代码:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(60000));

文档中的更多信息:https ://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

编辑:添加代码后。

consumer.consume 方法也接受 duration.ms 的参数

支持文档:https ://docs.confluent.io/5.0.0/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Consumer.html#Confluent_Kafka_Consumer_Consume_Confluent_Kafka_Message__System_Int32 _

于 2022-01-07T11:30:11.190 回答