2

我有一个来自 API 的消息列表,我需要映射此列表中的元素,将它们发送给生产者并一一阅读。多个用户可以同时发送消息列表。我需要按顺序映射列表,而不是在一个列表完全完成之前阅读另一个列表。由于当前结构中的每个请求都会创建一个新的生产者,因此我无法按顺序读取传入的消息。如何通过创建单个生产者来做到这一点?

public class MailController : BaseController
    {
        [HttpPost("fatura")]
        //[AllowAnonymous]
        public bool fatura([FromBody] List<MailMessage> mailMessages)
        {
            FaturaProducer.Produce(mailMessages);
            return default;
        }
}

public class FaturaProducer : FProducer<MailMessage>
    {
        public static Task Produce(List<MailMessage> data)
        {
           Produce(ConfigurationManager.KafkaSettings.Topics[Topics.FaturaKaydetViaTp].TopicName, data);
           return default;
        }
    }

public class FProducer<T> : ProducerBase2
    {
        private static readonly ObjectSerializer<T> serializer = new ObjectSerializer<T>();
        //public static IProducer<Null, T> producer = new ProducerBuilder<Null, T>(producerConfig).SetValueSerializer(serializer).Build();
        public static async Task<DeliveryResult<Null, T>> Produce(string topic,List<T> data)
        {
            try
            {
                LogManager.Logger.Debug("Producer initiating for {topic}", topic);

                using (var producer = new ProducerBuilder<Null, T>(producerConfig).SetValueSerializer(serializer).Build())
                {
                    LogManager.Logger.Debug("Producer initiated");

                    LogManager.Logger.Debug("Producing async");

                    try
                    {
                        foreach (var item in data)
                        {
                            producer.Poll(TimeSpan.FromSeconds(5));
                            await producer.ProduceAsync(topic, new Message<Null, T> { Value = item });
                        }
                       
                    }
                    catch (ProduceException<Null, string> ex)
                    {
                        LogManager.Logger.Fatal(ex, "Delivery failed: {reason}", ex.Error.Reason);
                        throw;
                    }
                }
            }
            catch (Exception ex)
            {
                LogManager.Logger.Fatal(ex, "Producer failed for {topic}", topic);
            }

            return null;
        }

    }

在此处输入图像描述

4

1 回答 1

1

目前,您正在遍历列表,调用您的 Produce 函数,并为每条消息创建一个生产者

或者,创建一个接受整个 list的新 Produce 方法,然后用 block 包装你的 foreach 循环,using (var producer以便所有消息只使用一个生产者,记住你不应该return在第一个循环项上。无论如何,您没有对 DeliveryResult 做任何事情,因此请使该方法无效。

于 2022-01-06T14:14:32.977 回答