我对 Kafka “本地:值序列化错误”有疑问。
var producerConfig = new ProducerConfig
{
BootstrapServers = bootStrapService,
MessageTimeoutMs = 5000
};
producerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
producerConfig.SaslMechanism = SaslMechanism.Plain;
producerConfig.SaslUsername = userName;
producerConfig.SaslPassword = password;
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = schemaUrl
};
var avroSerializerConfig = new AvroSerializerConfig
{
// optional Avro serializer properties:
BufferBytes = 100
};
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
using (var producer = new ProducerBuilder<string, LogMessages>(producerConfig)
.SetValueSerializer(new AvroSerializer<LogMessages>(schemaRegistry, avroSerializerConfig).AsSyncOverAsync())
.SetErrorHandler((_, e) => Logger.Info($"Error: {e.Reason}"))
.Build())
{
string inspectionNo = historyCard.CardMain.INSPECTIONNO;
Logger.Info($"Producer {producer.Name} producing on topic {topicName};inspectionno:{inspectionNo}");
await producer.ProduceAsync("log-messages",
new Message<string, LogMessages>
{
Key = Guid.NewGuid().ToString(),
Value = new LogMessages
{
IP = "192.168.0.1",
Message = "a test message 2"
}
});
}