问题标签 [confluent-kafka-dotnet]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
337 浏览

apache-kafka - 使用 confluent-kafka-dotnet 连接到外部 Kafka 服务器失败

我需要使用 .Net 从外部服务器读取 Kafka 消息。作为第一步,我在本地机器上安装了 Kafka,然后编写了 .Net 代码。它按要求工作。然后,我搬到了云端,但代码不起作用。这是我的设置。

我在 Azure 上的 Windows VM(VM1:10.0.0.4)上部署了 Kafka 服务器。它已启动并正在运行。我创建了一个测试主题并使用 cmd 生成了一些消息。为了测试一切正常,我用 cmd 打开了一个消费者并收到了生成的消息。

然后,我使用 Visual Studio 部署了另一个 Windows VM(VM2、10.0.0.5)。两个虚拟机都部署在同一个虚拟网络上,因此我不必担心打开端口或任何其他网络配置。

然后,我复制了我的 Visual Studio 项目代码,然后将引导服务器的 IP 地址更改为指向 Kafka 服务器。然后它不起作用,我读到我必须更改 Kafka 的服务器配置,所以我打开了 server.properties 并将 listeners 属性修改为listeners=PLAINTEXT://10.0.0.4:9092. 它仍然不起作用。

我在网上搜索并尝试了许多技巧,但它不起作用。我认为首先要向外部服务器(vm1)提供凭证,可能还有其他一些配置。不幸的是, confluent的官方文档很短,示例很少。官方 GitHub上也没有我的案例。我玩过 Consumer Config 类中的“Sasl”属性,但也没有成功。

错误信息是:

%3|1622220986.498|失败|rdkafka#consumer-1| [thrd:10.0.0.4:9092/bootstrap]:10.0.0.4:9092/bootstrap:连接到 ipv4#10.0.0.4:9092 失败:未知错误(在 CONNECT 状态下 21038 毫秒后)错误:10.0.0.4:9092/bootstrap:连接到 ipv4#10.0.0.4:9092 失败:未知错误(在 CONNECT 状态下 21038 毫秒后)错误:1/1 代理已关闭

这是我的 .Net 核心代码:

0 投票
1 回答
20 浏览

confluent-kafka-dotnet - 以编程方式验证 Confluent Kafka 访问权限

我试图弄清楚是否有任何方法可以使用 Confluent Kafka lib for .NET 以编程方式检查提供的凭据是否具有对不同 Kafka 主题的读写访问权限。

我想做的基本上是在系统启动时进行冒烟测试,以验证给定的凭据是否正确。例如,当部署到具有不同设置的各种环境时。

设置一个完整的消费者或生产者,然后实际读取或写入数据似乎很麻烦且昂贵。

我认为可能有一些东西,例如AdminClient允许验证这一点,但我没有看到任何暗示这个方向的东西。

0 投票
1 回答
23 浏览

apache-kafka - 使用来自主题的最新消息并忽略所有先前发布的消息

假设我有一个主题“测试”已经有一堆消息,现在我想订阅这个主题。但我不想消耗在新订阅之前发布的所有消息。基本上我只想使用订阅后发布的最新消息。

confluent.kafka 库中是否有任何功能可以帮助我实现此功能。

提前致谢。

0 投票
0 回答
50 浏览

avro - Avro 指纹模式

我在 C# 中使用 Confluent kafka 来使用消息,这些消息被格式化为从中提取架构指纹的十六进制字符串。如何从 C# 中的模式指纹中获取模式?我错过了什么吗?

0 投票
0 回答
208 浏览

.net-core - 哪个是 Kafka 消费者增加吞吐量的最佳配置

我有一堆通过 Apache Kafka 集成的服务,每个服务都有它们的消费者和生产者,但是我面临着消费速度的放缓,就像当主题负载如此之大时,消费速度会减慢一样。

这是我的 kafka 消费者实现的示例:

这是我的制片人的一个例子:

可以看到,consumer 只是从 kafka topic 中读取消息,并将消息反序列化为 MediatR INotification 对象,然后发布到 handler

处理程序与数据库事务、redis 缓存读/写和推送通知一起使用

我的处理程序的一个例子:

但是当我用 2000 个请求以 15 秒的速度运行负载测试时,消费者开始变慢,大约需要 2 到 5 分钟来消耗所有 2000 个请求。

我想知道我是否删除了 MediatR 层并开始处理 Consumer 类中的进程,这将提高性能

或者如果有一些 Kafka 配置可以提高吞吐量,比如删除 In Sync 主题副本的 Acks,或者在一段时间后提交偏移量。

首先,我使用 MassTransit 库实现了 kafka,在发现这种缓慢的消耗率之后,我尝试将库更改为 Confluet.Kafka 只是为了删除 MassTransit 抽象层,如果它有改进的话,但仍然相同:

<PackageReference Include="Confluent.Kafka" Version="1.7.0" />

任何已经遇到同样问题的人可以帮助我吗?

OBS:我的 Kafka 在集群中运行,Kubernetes 中有 3 个代理,每个主题有 6 个分区和 3 个复制因子

0 投票
0 回答
485 浏览

c# - 将多个 Kafka 消费者作为 Worker 服务运行

我们有多个消费者(目前)订阅了一个具有 10 个分区的主题,并且是一个消费者组的一部分。

由于这些消费者不断地收听消息,我们决定在 .NET Core 3.1 中使用工作服务模板。由于这个想法是每个进程有一个消费者,每个消费者都有自己的专用工作者服务。它们已经在 prod 中运行了几个星期,它们似乎正在按预期消费和处理消息。

这是一种矫枉过正的方法吗?部署一项服务并让它创建 10 个消费者实例并在单独的线程上启动它们会更有效吗?

例如:

非常感谢您的贡献。

0 投票
1 回答
346 浏览

c# - 使用 Confluent.Kafka 消费者库的 Kafka 消息密钥为空

我正在使用 Debezium 对我的数据库执行 CDC 以在 Kafka 中创建消息。使用 kafdrop 和 OffsetExplorer 之类的工具,我可以看到消息keyvalue。但是,在使用 Confluent.Kafka 库的 .NET 框架应用程序中,当我使用消息时,消息始终为空。

如何使用 Confluent.Kafka 库检索消息密钥?

这是VS项目的代码:

干杯,凯莉

0 投票
2 回答
476 浏览

c# - 如何使用融合平台将 ksql avro 格式序列化/反序列化为 c#

我正在使用KsqlDb具有以下形式的表格:

KSQL-DB 查询
create table currency (id integer,name varchar) with (kafka_topic='currency',partitions=1,value_format='avro');

C# 模型

现在我想知道我应该如何使用 Confluent 库在 C# 中从这个主题写入/读取数据:

写作

阅读

我不知道该怎么做,所以我尝试寻找序列化程序。我找到了这个库AvroSerializer,但我不知道作者在哪里获取schema.

有关如何读取/写入与我的ksqldb模型匹配的特定主题的任何帮助?

更新

经过一些研究和一些答案后,我开始使用schemaRegistry

现在我收到另一个错误:

期望长度为 5 字节或更长的数据帧,但总数据大小为 4 字节

正如有人好心指出的那样,我似乎只从模式注册表中检索了 id。

我怎样才能 : insert into currency (id,name) values (1,3)并在 C# 中将其作为 POCO 检索(如上所列)?

更新 2

在我找到这个源程序之后,似乎由于某种原因我无法将消息发布到表中。

发送消息时没有错误,但它没有发布到 Kafka。

0 投票
0 回答
59 浏览

.net - 如何处理提交两个事务?

我目前处于需要提交两个事务的情况,一个指向数据库(postgres,与 npgsql 库交互),另一个在 kafka 消息总线上(与 confluent.kafka 库交互)。

两者都支持在事务中发送/插入数据,但我目前在确定我应该如何提交数据库事务和消息总线事务时遇到问题,因为我需要对另一个执行回滚以防它失败。

我最初的策略是做一个

但这不包括向 kafka 提交更改失败、触发回滚的情况。

无论如何,我能以某种方式确保这两种情况吗?

0 投票
0 回答
692 浏览

.net - 托管 Kafka AWS (MSK) 代理验证失败的 SSL 身份验证

我正在尝试使用 AWS 托管的 Kafka 实例 (MSK) 创建一个 Kafka 客户端应用程序(生产者和消费者)。代理到代理的通信和客户端到代理的通信也通过集群中的 TLS 配置为安全的。CA 是 AWS 私有 CA,因为这是 MSK 唯一支持客户端通过 TLS 进行代理身份验证的 CA。

问题背景:AWS 官方文档 ( https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html#msk-authentication-client ) 步骤更倾向于 Java 世界并处理客户端信任库和作为 jks 的密钥库。但是 .Net 客户端不使用 Java JKS 容器格式(https://github.com/mhowlett/confluent-kafka-dotnet/tree/security/examples/Security)。

  • 服务器身份验证客户端:这部分我能够解决。由于 jks 只是一个数据存储,因此在按照上面的 aws doc 创建 Keystore 之后,我运行了一些额外的 keytool 和 openssl 命令来显式提取客户端证书和密钥。我能够使用它成功地生成和使用消息。

  • 但是,为了让客户端验证服务器,我需要将 ssl.ca.location 设置为 CA 根证书。从私有 CA(用作 MSK 实例的 CA)我已经下载了默认为 pem 格式的根 CA(注意:这仅包含证书详细信息,没有密钥详细信息)。使用以下命令将其转换为 .crt:

    add pem to truststore : keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file Certificate.pem

    get cert from truststore : keytool -export -alias CARoot -keystore kafka.client.truststore.jks -rfc -file ca-root.crt

将上述 ca-root.crt 用于 ca 位置似乎不起作用。它抛出错误代理验证失败。客户端机器是 Windows,但这应该不是问题,因为除了通过证书位置 ssl.ca.location 我还尝试在机器中安装证书但仍然没有运气。我看到以下错误:

我是否缺少客户端验证服务器部分的内容?

更新:我也能够解决代理验证问题。以上官方 AWS 文档依赖 Java 证书进行 Broker 验证身份验证。我对其进行了扩展以从 jks 派生 perm 和进一步的 .crt,并将生成的 .crt 用作 ssl.ca.location 的 CA,并且它可以工作。但是,生成的证书中有太多证书,并且不清楚哪个证书用于验证 MSK 代理证书。另外为什么从私有 AWS CA 下载的 CA 证书不起作用?如果有人可以在这些方面提供帮助,那将非常有帮助。还请建议是否有针对该问题的替代标准解决方案。