1

我正在使用KsqlDb具有以下形式的表格:

KSQL-DB 查询
create table currency (id integer,name varchar) with (kafka_topic='currency',partitions=1,value_format='avro');

C# 模型

public class Currency
{
    public int Id{get;set;}
    public string Name{get;set;}
}

现在我想知道我应该如何使用 Confluent 库在 C# 中从这个主题写入/读取数据:

写作

 IProducer<int, Currency> producer=....

 Currency cur=new Currency();

 Message<int,Currency> message = new Message<int, Currency>
            {
                Key = msg.Id,
                Timestamp = new Timestamp(DateTime.UtcNow, TimestampType.CreateTime),
                Value = msg
            };
 DeliveryResult<int,Currency> delivery =  await this.producer.ProduceAsync(topic,message);

阅读

IConsumer<int,Currency> iconsumer = new ConsumerBuilder<int, Currency>(config)
                .SetKeyDeserializer(Deserializers.Int32) //i assume i need to use the id from my dto
                .SetValueDeserializer(...) //what deserializer
                .Build();

ConsumeResult<int,Currency> result = consumer.Consume();

Currency message =  // what deserializer JsonSerializer.Deserialize<Currency>(result.Message.Value);

我不知道该怎么做,所以我尝试寻找序列化程序。我找到了这个库AvroSerializer,但我不知道作者在哪里获取schema.

有关如何读取/写入与我的ksqldb模型匹配的特定主题的任何帮助?

更新

经过一些研究和一些答案后,我开始使用schemaRegistry

var config = new ConsumerConfig
            {
                GroupId = kafkaConfig.ConsumerGroup,
                BootstrapServers = kafkaConfig.ServerUrl,
                AutoOffsetReset = AutoOffsetReset.Earliest
            };
var schemaRegistryConfig = new SchemaRegistryConfig
            {
                Url = kafkaConfig.SchemaRegistryUrl
            };
var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);

IConsumer<int,Currency> consumer = new ConsumerBuilder<int, Currency>(config)
            .SetKeyDeserializer(new AvroDeserializer<int>(schemaRegistry).AsSyncOverAsync())
            .SetValueDeserializer(new AvroDeserializer<Currency>(schemaRegistry).AsSyncOverAsync())
            .Build();

ConsumeResult<int, Currency> result = consumer.Consume();

现在我收到另一个错误:

期望长度为 5 字节或更长的数据帧,但总数据大小为 4 字节

正如有人好心指出的那样,我似乎只从模式注册表中检索了 id。

我怎样才能 : insert into currency (id,name) values (1,3)并在 C# 中将其作为 POCO 检索(如上所列)?

更新 2

在我找到这个源程序之后,似乎由于某种原因我无法将消息发布到表中。

发送消息时没有错误,但它没有发布到 Kafka。

4

2 回答 2

0

我通过定义我的自定义序列化程序解决了这个问题,从而实现了ISerializer<T>andIDeserializer<T>接口,它们的腹部只是 or 的包装System.Text.Json.JsonSerializerNewtonsoftJson

串行器

public class MySerializer:ISerializer<T>
{
     byte[] Serialize(T data, SerializationContext context)
     {
          var str=System.Text.Json.JsonSerializer.Serialize(data); //you can also use Newtonsoft here
          var bytes=Encoding.UTF8.GetBytes(str);
          return bytes;
     }
}

用法

   var config = new ConsumerConfig
                {
                    GroupId = kafkaConfig.ConsumerGroup,
                    BootstrapServers = kafkaConfig.ServerUrl,
                    AutoOffsetReset = AutoOffsetReset.Earliest
                };
    IConsumer<int,Currency> consumer = new ConsumerBuilder<int, Currency>(config)
            .SetValueDeserializer(new MySerializer<Currency>())
            .Build();

ConsumeResult<int, Currency> result = consumer.Consume();

附言

在我实现了接口之后,我什至没有在这里使用模式注册表

于 2021-08-05T01:39:10.447 回答
0

我找到了这个库 AvroSerializer ,但我不知道作者从哪里获取架构。

不清楚为什么需要使用 Confluent 以外的库,但他们是从 Schema Registry中获得的。您可以使用CachedSchemaRegistryClient它轻松获取模式字符串,但您不应该在代码中使用它,因为反序列化器将自行从注册表下载。

如果您参考特定 Avro 消费examples/confluent-kafka-dotnetrepo,您可以看到它们User从文件生成类User.avsc,这似乎正是您想要在这里做的,Currency而不是自己编写

于 2021-07-30T05:40:00.110 回答