1

如何将 MassTransit 配置为在使用 Avro 进行序列化/反序列化时使用 Confluent Kafka 主题进行生产和消费?我看到 Avro 序列化器/反序列化器在包中Confluent.SchemaRegistry.Serdes。一些代码示例将受到欢迎。

4

1 回答 1

2

要将 MassTransit 配置为使用 Avro,我的做法是使用生成的类文件 ( avrogen),然后配置生产者和主题端点,如下所示:

首先,您需要为模式注册表创建客户端:

var schemaRegistryClient = new CachedSchemaRegistryClient(new Dictionary<string, string>
{
    {"schema.registry.url", "localhost:8081"},
});

然后,您可以配置骑手:

services.AddMassTransit(x =>
{
    x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
    x.AddRider(rider =>
    {
        rider.AddConsumer<KafkaMessageConsumer>();

        rider.AddProducer<string, KafkaMessage>(Topic, context => context.MessageId.ToString())
            .SetKeySerializer(new AvroSerializer<string>(schemaRegistryClient).AsSyncOverAsync())
            .SetValueSerializer(new AvroSerializer<KafkaMessage>(schemaRegistryClient).AsSyncOverAsync());

        rider.UsingKafka((context, k) =>
        {
            k.Host("localhost:9092");

            k.TopicEndpoint<string, KafkaMessage>("topic-name", "consumer-group", c =>
            {
                c.SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistryClient).AsSyncOverAsync());
                c.SetValueDeserializer(new AvroDeserializer<KafkaMessage>(schemaRegistryClient).AsSyncOverAsync());
                c.AutoOffsetReset = AutoOffsetReset.Earliest;
                c.ConfigureConsumer<KafkaMessageConsumer>(context);

                c.CreateIfMissing(m =>
                {
                    m.NumPartitions = 2;
                });
            });
        });
    });
});

您可以查看工作单元测试以查看更多详细信息。我可能应该将此添加到文档中。

我刚刚写了这个来回答这个问题,直到一个小时前我才使用 Avro。

另外,我使用Confluent 中的这篇文章来启动和运行。链接的docker-compose.yml单元测试项目中配置了所有需要的服务。

于 2021-02-01T03:48:50.647 回答