0

我对 Kafka “本地:值序列化错误”有疑问。

var producerConfig = new ProducerConfig
        {
            BootstrapServers = bootStrapService,
            MessageTimeoutMs = 5000
        };
        producerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
        producerConfig.SaslMechanism = SaslMechanism.Plain;
        producerConfig.SaslUsername = userName;
        producerConfig.SaslPassword = password;
        var schemaRegistryConfig = new SchemaRegistryConfig
        {
            Url = schemaUrl
        };
        
        var avroSerializerConfig = new AvroSerializerConfig
        {
            // optional Avro serializer properties:
            BufferBytes = 100
        };
 using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
        using (var producer = new ProducerBuilder<string, LogMessages>(producerConfig)
            .SetValueSerializer(new AvroSerializer<LogMessages>(schemaRegistry, avroSerializerConfig).AsSyncOverAsync())
            .SetErrorHandler((_, e) => Logger.Info($"Error: {e.Reason}"))
            .Build())
        {
            string inspectionNo = historyCard.CardMain.INSPECTIONNO;
            Logger.Info($"Producer {producer.Name} producing on topic {topicName};inspectionno:{inspectionNo}");
            await producer.ProduceAsync("log-messages",
               new Message<string, LogMessages>
               {
                   Key = Guid.NewGuid().ToString(),
                   Value = new LogMessages
                   {
                       IP = "192.168.0.1",
                       Message = "a test message 2"
                   }
               });
            }
4

0 回答 0