问题标签 [confluent-kafka-dotnet]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
148 浏览

asp.net-core - 如何提高kafka使用confluent-kafka-dotnet中消费者消费消息的速度?

测试结果图 当我使用confluent.kafka-net时,我想保证kafka消息在.netcore中的顺序,我创建了一个mschannel,

_msChannel = Channel.CreateUnbounded();

收听时,我得到了消费结果和收到的消息,并且

当消息没有错误时,我使用message.Ack()如下:

结果是kafka的consumer很慢,我发现原因是partition的consumer和resume,耗费了太多的时间,但是我没有更好的办法来处理这种情况。

在我的服务器测试中,消费者每分钟消耗约 300 条记录。

0 投票
1 回答
41 浏览

.net - Kafka .NET 客户端自定义时间戳

我一直在使用 Kafka .NET 客户端,一切正常,但现在时间戳是自动设置的,我需要的是能够发送我自己的时间戳。

这是我用来发送数据的代码。如何覆盖时间戳?

我正在使用这个包https://docs.confluent.io/clients-confluent-kafka-dotnet

0 投票
0 回答
249 浏览

c# - .NetCore 中的多线程 Kafka Consumer

我想在 .NetCore 中实现多线程 Kafka 消费者,如下图所示。在java中,我们可以使用 Poll 方法检索多条记录,并且可以获取每个分区的记录。我想在.net 中实现同样的事情。

在此处输入图像描述

Java 实现。

我在 .net 核心中的消费者代码。

0 投票
0 回答
44 浏览

c# - KafkaConsumer 在检索消息时永远挂起

我想使用 confluent kafka 库从主题中提取消息,但是 var cr = c.Consume(cts.Token); 在这一行中,应用程序等待无限长的时间并且没有给出任何错误。让你永远等待。

我的源代码如下

0 投票
1 回答
79 浏览

apache-kafka - 使用 .NET Core 在 Kafka 中使用由 OTEL 代理生成的 Protobuf 序列化日志消息

我正在使用 OpenTelemetry 收集器代理将日志发送到 Kafka 流层。我想在 .NET Kafka 消费者中使用这些日志消息(目前),但我遇到了一些问题。OTEL 似乎使用了 Protobuf 序列化,这使得它有点棘手。注意:最终我想通过 Kafka Connect 将日志发送到 Elasticsearch,但每次只有一步......

首先,是否可以让 OpenTelemetry Kafka 导出器使用 JSON 序列化?如前所述,它看起来默认使用 Protobuf,并且似乎没有用于日志的 JSON 序列化选项 - 请参阅OTEL Kafka 导出器文档

在此处输入图像描述

或者,如何使用 OpenTelemetry 收集器代理使用 Protobuf 序列化发布到 Kafka 的日志消息?

OTEL 配置如下:

我的消费者应用程序是 .NET Core。到目前为止,我刚刚从confluent-kafka-dotnet GitHub 页面获得了一个基本的生产者/消费者示例(所有这些都适用于简单消息,但不适用于 OTEL 代理发布的 Protobuf 消息)。我设法在这里找到了一个 Protobuf 消费者示例,但是我将如何生成 proto 类来反序列化 OTEL 日志。我有点迷失在这里...

0 投票
0 回答
24 浏览

c# - 如何在 C# confluent kafka 中订阅具有不同模式的两个不同主题?

我有一个消费者,想订阅两个具有不同模式的不同主题。我需要像下面这样配置这个主题之一:

但是我的另一个主题不需要任何 SchemaRegistryBasicAuthCredentials。(那么我认为这将是两个不同的消费者生成器)。是否可以与一个消费者一起订阅两个不同的主题?

0 投票
1 回答
66 浏览

apache-kafka - Apache Kafka 的自定义连接器

我正在寻找为 Apache Kafka 编写一个自定义连接器以连接到 SQL 数据库以获取 CDC 数据。我想编写一个自定义连接器,这样我就可以使用一个连接器连接到多个数据库,因为所有市场连接器只为每个连接器提供一个数据库。

第一个问题:是否可以使用一个自定义连接器连接到多个数据库?另外,在那个自定义连接器中,我可以定义数据应该转到哪些主题吗?

第二个问题:我可以在 .NET 中编写自定义连接器还是必须是 Java?有没有我可以查看 .net 中数据库的 CDC 自定义连接器的示例?

0 投票
0 回答
42 浏览

apache-kafka - 无法使用 .net 核心应用程序中的 p12 文件连接到 Kafka

我有一个需要使用 SSL 协议连接到 Kafka 的 .net 核心应用程序。我有一个 p12 文件,我需要将其用于与代理进行身份验证,但我无法这样做。正在使用的生产者配置是

0 投票
2 回答
307 浏览

apache-kafka - Kafka 消费者无法读取所有可用分区

我们注意到我们的一个产品主题(6 个分区)存在一个奇怪的问题,其中我们的消费者(dotnet 核心,只有 1 个实例)只能从 3 个分区(0、1、3)读取。这显然会影响应用程序的行为,因为消费者缺少来自其他 3 个分区(2、4、5)的消息。我们能够验证主题配置没有问题。由于消息过期而怀疑存在偏移提交问题,我们删除了该主题中的所有消息(通过将保留期更改为一个小数字),但这并没有解决问题。我们尝试了多次重启消费者应用程序,每次分区分配都保持不变(0,1,3),这让我们相信其他分区有问题。我们正在考虑创建一个新主题的想法。想法,我会把它贴在这里检查我们是否遗漏了什么。任何输入表示赞赏。谢谢。

0 投票
1 回答
60 浏览

apache-kafka - kafka confluent .NET 1.8.2 如何在消费者配置中覆盖 max.poll.interval.ms?

我的应用程序必须在特定时间或触发时使用来自 kafka 主题的消息。因此,当我的消费者坐在那里而不消费消息时,它会从消费者组中删除。

我的理解是,一旦超过最大轮询间隔,消费者本身就会向代理发送请求以从消费者组中删除。此配置设置是否在 Confluent Kafka .NET API 版本 1.8.2 中公开?