0

我遇到了以下代码的问题:

    public IActionResult Consume(string topic)
    {
        try
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = "kafka-01:9092,kafka-02:9092,kafka-03:9092",
                SecurityProtocol = SecurityProtocol.SaslPlaintext,
                SaslMechanism = SaslMechanism.ScramSha256,
                SaslUsername = "my-consumer",
                SaslPassword = "pass",
                GroupId = "my-test-group"
            };

            using IConsumer<Null, string> consumer = new ConsumerBuilder<Null, string>(config).Build();
            consumer.Subscribe(topic);

            var cts = new CancellationTokenSource();
            cts.CancelAfter(TimeSpan.FromSeconds(60));

            ConsumeResult<Null, string> result = consumer.Consume(cts.Token);
            consumer.Close();

            return Ok(result.Message.Value);
        }
        catch (OperationCanceledException cancelEx)
        {
            return NoContent();
        }
        catch (ConsumeException consumeEx)
        {
            return BadRequest(consumeEx.ToString());
        }
    }

但是消费者不工作!正如我在 Debug 中看到的,它会冻结,直到 cancelationToken 过期,抛出 OperationCancelledException,我没有收到任何结果。

生产者似乎工作,它不会冻结或返回错误(使用类似的配置)。

为什么不起作用?

4

1 回答 1

0

默认情况下,消费者从主题的末尾开始并等待新消息

如果要消费现有数据,创建一个新的 groupId 并设置AutoOffsetReset = AutoOffsetReset.Earliest

于 2021-08-19T14:20:34.973 回答