3

我有一堆通过 Apache Kafka 集成的服务,每个服务都有它们的消费者和生产者,但是我面临着消费速度的放缓,就像当主题负载如此之大时,消费速度会减慢一样。

这是我的 kafka 消费者实现的示例:

    public class Consumer : BackgroundService
    {
        private readonly KafkaConfiguration _kafkaConfiguration;
        private readonly ILogger<Consumer> _logger;
        private readonly IConsumer<Null, string> _consumer;
        private readonly IMediator _mediator;

        public Consumer(
            KafkaConfiguration kafkaConfiguration,
            ILogger<Consumer> logger,
            IServiceScopeFactory provider
        )
        {
            _logger = logger;
            _kafkaConfiguration = kafkaConfiguration;
            _mediator = provider.CreateScope().ServiceProvider.GetRequiredService<IMediator>();

            var consumerConfig = new ConsumerConfig
            {
                GroupId = "order-service",
                BootstrapServers = kafkaConfiguration.ConnectionString,
                SessionTimeoutMs = 6000,
                ConsumeResultFields = "none",
                QueuedMinMessages = 1000000,
                SecurityProtocol = SecurityProtocol.Plaintext,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnableAutoOffsetStore = false,
                FetchWaitMaxMs = 100,
                AutoCommitIntervalMs = 1000
            };

            _consumer = new ConsumerBuilder<Null, string>(consumerConfig).Build();
        }

        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            new Thread(() => StartConsumingAsync(stoppingToken)).Start();
            return Task.CompletedTask;
        }

        public async Task StartConsumingAsync(CancellationToken cancellationToken)
        {
            _consumer.Subscribe("orders");

            while (!cancellationToken.IsCancellationRequested)
            {
                try
                {
                    var consumedResult = _consumer.Consume(cancellationToken);

                    if (consumedResult == null) continue;

                    var messageAsEvent = JsonSerializer.Deserialize<OrderReceivedIntegrationEvent>(consumedResult.Message.Value);

                    await _mediator.Publish(messageAsEvent, CancellationToken.None);
                }
                catch (Exception e)
                {
                    _logger.LogCritical($"Error {e.Message}");
                }
            }
        }

这是我的制片人的一个例子:

public class Producer
    {
        protected readonly IProducer<Null, string> Producer;

        protected Producer(string host)
        {
            var producerConfig = new ProducerConfig
            {
                BootstrapServers = host,
                Acks = Acks.Leader
            };

            Producer = new ProducerBuilder<Null, string>(producerConfig).Build();
        }

        public void Produce(InitialOrderCreatedIntegrationEvent message)
        {
            var messageSerialized = JsonSerializer.Serialize(message);
            Producer.Produce("orders", new Message<Null, string> {Value = messageSerialized});
        }
    }

可以看到,consumer 只是从 kafka topic 中读取消息,并将消息反序列化为 MediatR INotification 对象,然后发布到 handler

处理程序与数据库事务、redis 缓存读/写和推送通知一起使用

我的处理程序的一个例子:

public override async Task Handle(OrderReceivedIntegrationEvent notification, CancellationToken cancellationToken)
        {
            try
            {
                // Get order from database
                var order = await _orderRepository.GetOrderByIdAsync(notification.OrderId.ToString());
                
                order.EditOrder(default, notification.Price);
                
                order.ChangeOrderStatus(notification.Status, notification.RejectReason);
                
                // commit the transaction
                if (await _uow.Commit())
                {
                    var cacheModificationRequest = _mapper.Map<CacheOrdersModificationRequestedIntegrationEvent>(order);
                    
                    // send mediatr notification to change cache information in Redis
                    await _bus.Publish(cacheModificationRequest, cancellationToken);
                }
            }
            catch (Exception e)
            {
                _logger.LogInformation($"Error {e.Message}");
            }
        }

但是当我用 2000 个请求以 15 秒的速度运行负载测试时,消费者开始变慢,大约需要 2 到 5 分钟来消耗所有 2000 个请求。

我想知道我是否删除了 MediatR 层并开始处理 Consumer 类中的进程,这将提高性能

或者如果有一些 Kafka 配置可以提高吞吐量,比如删除 In Sync 主题副本的 Acks,或者在一段时间后提交偏移量。

首先,我使用 MassTransit 库实现了 kafka,在发现这种缓慢的消耗率之后,我尝试将库更改为 Confluet.Kafka 只是为了删除 MassTransit 抽象层,如果它有改进的话,但仍然相同:

<PackageReference Include="Confluent.Kafka" Version="1.7.0" />

任何已经遇到同样问题的人可以帮助我吗?

OBS:我的 Kafka 在集群中运行,Kubernetes 中有 3 个代理,每个主题有 6 个分区和 3 个复制因子

4

0 回答 0