我有一个来自 API 的消息列表,我需要映射此列表中的元素,将它们发送给生产者并一一阅读。多个用户可以同时发送消息列表。我需要按顺序映射列表,而不是在一个列表完全完成之前阅读另一个列表。由于当前结构中的每个请求都会创建一个新的生产者,因此我无法按顺序读取传入的消息。如何通过创建单个生产者来做到这一点?
public class MailController : BaseController
{
[HttpPost("fatura")]
//[AllowAnonymous]
public bool fatura([FromBody] List<MailMessage> mailMessages)
{
FaturaProducer.Produce(mailMessages);
return default;
}
}
public class FaturaProducer : FProducer<MailMessage>
{
public static Task Produce(List<MailMessage> data)
{
Produce(ConfigurationManager.KafkaSettings.Topics[Topics.FaturaKaydetViaTp].TopicName, data);
return default;
}
}
public class FProducer<T> : ProducerBase2
{
private static readonly ObjectSerializer<T> serializer = new ObjectSerializer<T>();
//public static IProducer<Null, T> producer = new ProducerBuilder<Null, T>(producerConfig).SetValueSerializer(serializer).Build();
public static async Task<DeliveryResult<Null, T>> Produce(string topic,List<T> data)
{
try
{
LogManager.Logger.Debug("Producer initiating for {topic}", topic);
using (var producer = new ProducerBuilder<Null, T>(producerConfig).SetValueSerializer(serializer).Build())
{
LogManager.Logger.Debug("Producer initiated");
LogManager.Logger.Debug("Producing async");
try
{
foreach (var item in data)
{
producer.Poll(TimeSpan.FromSeconds(5));
await producer.ProduceAsync(topic, new Message<Null, T> { Value = item });
}
}
catch (ProduceException<Null, string> ex)
{
LogManager.Logger.Fatal(ex, "Delivery failed: {reason}", ex.Error.Reason);
throw;
}
}
}
catch (Exception ex)
{
LogManager.Logger.Fatal(ex, "Producer failed for {topic}", topic);
}
return null;
}
}