我正在使用KsqlDb
具有以下形式的表格:
KSQL-DB 查询
create table currency (id integer,name varchar) with (kafka_topic='currency',partitions=1,value_format='avro');
C# 模型
public class Currency
{
public int Id{get;set;}
public string Name{get;set;}
}
现在我想知道我应该如何使用 Confluent 库在 C# 中从这个主题写入/读取数据:
写作
IProducer<int, Currency> producer=....
Currency cur=new Currency();
Message<int,Currency> message = new Message<int, Currency>
{
Key = msg.Id,
Timestamp = new Timestamp(DateTime.UtcNow, TimestampType.CreateTime),
Value = msg
};
DeliveryResult<int,Currency> delivery = await this.producer.ProduceAsync(topic,message);
阅读
IConsumer<int,Currency> iconsumer = new ConsumerBuilder<int, Currency>(config)
.SetKeyDeserializer(Deserializers.Int32) //i assume i need to use the id from my dto
.SetValueDeserializer(...) //what deserializer
.Build();
ConsumeResult<int,Currency> result = consumer.Consume();
Currency message = // what deserializer JsonSerializer.Deserialize<Currency>(result.Message.Value);
我不知道该怎么做,所以我尝试寻找序列化程序。我找到了这个库AvroSerializer,但我不知道作者在哪里获取schema
.
有关如何读取/写入与我的ksqldb
模型匹配的特定主题的任何帮助?
更新
经过一些研究和一些答案后,我开始使用schemaRegistry
var config = new ConsumerConfig
{
GroupId = kafkaConfig.ConsumerGroup,
BootstrapServers = kafkaConfig.ServerUrl,
AutoOffsetReset = AutoOffsetReset.Earliest
};
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = kafkaConfig.SchemaRegistryUrl
};
var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
IConsumer<int,Currency> consumer = new ConsumerBuilder<int, Currency>(config)
.SetKeyDeserializer(new AvroDeserializer<int>(schemaRegistry).AsSyncOverAsync())
.SetValueDeserializer(new AvroDeserializer<Currency>(schemaRegistry).AsSyncOverAsync())
.Build();
ConsumeResult<int, Currency> result = consumer.Consume();
现在我收到另一个错误:
期望长度为 5 字节或更长的数据帧,但总数据大小为 4 字节
正如有人好心指出的那样,我似乎只从模式注册表中检索了 id。
我怎样才能 : insert into currency (id,name) values (1,3)
并在 C# 中将其作为 POCO 检索(如上所列)?
更新 2
在我找到这个源程序之后,似乎由于某种原因我无法将消息发布到表中。
发送消息时没有错误,但它没有发布到 Kafka。