我有一堆通过 Apache Kafka 集成的服务,每个服务都有它们的消费者和生产者,但是我面临着消费速度的放缓,就像当主题负载如此之大时,消费速度会减慢一样。
这是我的 kafka 消费者实现的示例:
public class Consumer : BackgroundService
{
private readonly KafkaConfiguration _kafkaConfiguration;
private readonly ILogger<Consumer> _logger;
private readonly IConsumer<Null, string> _consumer;
private readonly IMediator _mediator;
public Consumer(
KafkaConfiguration kafkaConfiguration,
ILogger<Consumer> logger,
IServiceScopeFactory provider
)
{
_logger = logger;
_kafkaConfiguration = kafkaConfiguration;
_mediator = provider.CreateScope().ServiceProvider.GetRequiredService<IMediator>();
var consumerConfig = new ConsumerConfig
{
GroupId = "order-service",
BootstrapServers = kafkaConfiguration.ConnectionString,
SessionTimeoutMs = 6000,
ConsumeResultFields = "none",
QueuedMinMessages = 1000000,
SecurityProtocol = SecurityProtocol.Plaintext,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoOffsetStore = false,
FetchWaitMaxMs = 100,
AutoCommitIntervalMs = 1000
};
_consumer = new ConsumerBuilder<Null, string>(consumerConfig).Build();
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
new Thread(() => StartConsumingAsync(stoppingToken)).Start();
return Task.CompletedTask;
}
public async Task StartConsumingAsync(CancellationToken cancellationToken)
{
_consumer.Subscribe("orders");
while (!cancellationToken.IsCancellationRequested)
{
try
{
var consumedResult = _consumer.Consume(cancellationToken);
if (consumedResult == null) continue;
var messageAsEvent = JsonSerializer.Deserialize<OrderReceivedIntegrationEvent>(consumedResult.Message.Value);
await _mediator.Publish(messageAsEvent, CancellationToken.None);
}
catch (Exception e)
{
_logger.LogCritical($"Error {e.Message}");
}
}
}
这是我的制片人的一个例子:
public class Producer
{
protected readonly IProducer<Null, string> Producer;
protected Producer(string host)
{
var producerConfig = new ProducerConfig
{
BootstrapServers = host,
Acks = Acks.Leader
};
Producer = new ProducerBuilder<Null, string>(producerConfig).Build();
}
public void Produce(InitialOrderCreatedIntegrationEvent message)
{
var messageSerialized = JsonSerializer.Serialize(message);
Producer.Produce("orders", new Message<Null, string> {Value = messageSerialized});
}
}
可以看到,consumer 只是从 kafka topic 中读取消息,并将消息反序列化为 MediatR INotification 对象,然后发布到 handler
处理程序与数据库事务、redis 缓存读/写和推送通知一起使用
我的处理程序的一个例子:
public override async Task Handle(OrderReceivedIntegrationEvent notification, CancellationToken cancellationToken)
{
try
{
// Get order from database
var order = await _orderRepository.GetOrderByIdAsync(notification.OrderId.ToString());
order.EditOrder(default, notification.Price);
order.ChangeOrderStatus(notification.Status, notification.RejectReason);
// commit the transaction
if (await _uow.Commit())
{
var cacheModificationRequest = _mapper.Map<CacheOrdersModificationRequestedIntegrationEvent>(order);
// send mediatr notification to change cache information in Redis
await _bus.Publish(cacheModificationRequest, cancellationToken);
}
}
catch (Exception e)
{
_logger.LogInformation($"Error {e.Message}");
}
}
但是当我用 2000 个请求以 15 秒的速度运行负载测试时,消费者开始变慢,大约需要 2 到 5 分钟来消耗所有 2000 个请求。
我想知道我是否删除了 MediatR 层并开始处理 Consumer 类中的进程,这将提高性能
或者如果有一些 Kafka 配置可以提高吞吐量,比如删除 In Sync 主题副本的 Acks,或者在一段时间后提交偏移量。
首先,我使用 MassTransit 库实现了 kafka,在发现这种缓慢的消耗率之后,我尝试将库更改为 Confluet.Kafka 只是为了删除 MassTransit 抽象层,如果它有改进的话,但仍然相同:
<PackageReference Include="Confluent.Kafka" Version="1.7.0" />
任何已经遇到同样问题的人可以帮助我吗?
OBS:我的 Kafka 在集群中运行,Kubernetes 中有 3 个代理,每个主题有 6 个分区和 3 个复制因子