问题标签 [confluent-kafka-dotnet]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
214 浏览

docker-compose - 在 GitLab 或 Travis 等云 CI 服务器上的 docker-compose Stack 中,Confluent Kafka 客户端的正确用法是什么?

我是新的 Kafka 用户,并设法让 docker-compose 堆栈在本地工作,以从 ASP.NET Core 3.1 测试服务成功运行功能测试。这与 Kafa、Zookeeper 和 Rest-Proxy 服务存在于同一网络上的同一 docker-compose 堆栈中。

如果主题尚不存在,SUT 和测试使用.NET Core 客户端在启动时创建主题。

一旦我尝试在远程 GitLab.com CI 服务器上运行这个 docker-compose 堆栈,测试就会在创建主题时挂起。日志(见下文)显示 .NET 客户端正在连接到kafka:19092docker-compose 堆栈中的正确内部服务。kafka 服务有一些活动开始创建主题,然后它被阻止。我应该在日志中看到一条消息,确认主题创建。

.NET 客户端创建 Kafka 主题

如何配置 kafka 代理和 rest 代理 docker-compose 服务以在外部 CI 服务器上运行?这可能吗?

下面包括 docker-compose 堆栈和 GitLab CI 环境的详细信息...

创建泊坞窗网络

docker-compose 堆栈与 zookeeper、kafka、rest-proxy 和 ASP.NET Core 3.1 用于集成测试服务

GitLab.com CI Docker 网络环境

Gitab.com CI 服务器日志

0 投票
1 回答
757 浏览

asp.net-core - 在 Docker Compose 环境中运行时,ASP.NET Core WebApplicationFactory CreateClient 在 GitLab CI 上启动服务器阻塞

我的 ASP.NET Core 3.1 API 有一个功能测试项目,它使用 WebApplicationFactory 为 API 创建测试服务器。测试在本地和本地 docker-compose 环境中通过。

但是,当在同一 docker-compose 环境中的 GitLab CI 服务器上运行时,WebApplication 工厂的CreateClient方法是阻塞的。当测试服务器启动时,confluent Kafka Admin Service 被阻塞,即没有显示确认创建主题的确认日志消息。我在 GitLab 上创建了一个小项目来突出这个问题。

问题似乎出在 WebApplication 测试服务器和 Confluent Kafka 上,因为我创建了一个 docker-compose 堆栈,它在 GitLab CI 上启动被测软件 WebApp 并成功启动。

被测软件包含后台/托管服务:

  • Kafka Admin Service 创建主题 - 使用 WebAppicationFactory 测试服务器时,这会在 CI 服务器上阻塞
  • 卡夫卡消费者
  • MqttKafkaBridge

它还使用 Autofac 并启动 SignalR Hub。

WebApplicationFactory在 Gitlab 或 Travis 等远程 CI 服务器上使用时,是否有人遇到过类似的问题?

是因为WebApplicationFactory.CreateClient()创建了一个TestServer作为本地主机运行的?

使用 WebApplicationFactory 进行测试

为被测软件创建一个 WebApplicationFactory 并在创建后显示控制台日志消息。在 CI 服务器上运行时,在创建工厂客户端后不会显示控制台消息。

启动.cs

包含 Kafka 和 Mqtt 的后台服务

向 Kafka 发送创建主题请求的后台服务 - 使用 WebApplicationFactory 测试服务器时阻塞

远程运行相同的 docker-compose 堆栈的 GitLab 管道

为管道变量创建 .env 文件并启动 docker-compose 堆栈

GitLab CI 作业输出

已通过 CI 上的调试读取并显示正确的测试环境变量

码头工人撰写

在同一网络中包含 Kafka、Zookeeper 和 WebApp(Source+Tests) 服务。如果更改 WebApp 服务器的命令以运行待测软件 WebApp,则在 CI 上成功运行。只有在远程 GitLan CI 服务器上使用 WebApplicationFactory 测试服务器时才会遇到此问题。

0 投票
2 回答
643 浏览

apache-kafka - 为什么消费者在重启后会读取来自 Kafka 主题的所有消息?

我使用融合的 .net 客户端。订阅者总是在重启后(订阅者服务重启)读取来自 Kafka 主题的所有消息。如何提交消费者已经实现并从中读取的偏移量?也许一些消费者配置可以帮助......

0 投票
1 回答
737 浏览

c# - 无法使用 Confluent .Net 客户端连接到 Azure 事件中心

我正在尝试使用 Confluent .Net 客户端 (1.5.2) 连接到 Azure 事件中心,但我不断收到 SSL 握手失败错误。

我的 Confluent ProducerConfig 如下所示:

当我构建 ProducerClient 时,它无法连接 SSL 握手失败:

我已经用两个不同的事件中心尝试过这个,但没有运气。有人可以帮助我理解为什么这会失败以及我能做些什么来让它工作吗?

谢谢!

0 投票
0 回答
1496 浏览

asp.net-core - 为什么 Kafka .NET 客户端消费者使用 confluent/cp-kafka 阻塞?为什么它可以正确使用 lensio:fast-data-dev?

我正在基于 Debian 3.1-buster 的 docker compose 堆栈中运行 Xunit 功能测试,Confluent Kafka .NET Client v1.5.3 连接到代理confluentinc/cp-kafka:6.0.1。我对卡夫卡还很陌生......

架构如下图所示:

建筑学

我正在使用 xUnit 进行测试,并且有一个类夹具,它在测试集合/类的生命周期内启动一个进程内通用 Kestrel 主机。我正在使用进程内通用主机,因为我有一个使用 websockets 的附加 signalR 服务。据我了解,WebApplicationFactory仅在内存中,不使用网络套接字。

通用主机包含一个 Kafka 生产者和消费者。生产者是使用该Produce方法生产的单例服务。消费者使用取消令牌BackgroundService运行Consume循环(请参见下面的进一步清单)。消费者具有以下配置:

  • 启用自动提交:真
  • EnableAutoOffsetStore: 假
  • AutoOffsetReset:AutoOffsetReset.Latest

它是具有 3 个分区的单个消费者。group.initial.rebalance.delay配置为 1000ms

测试生成一个线程,该线程发送事件以触发生产者将数据发布到 Kafka 主题。然后测试会等待一个时间延迟,ManualResetEvent以便让消费者有时间处理主题数据。

消费者的问题是阻塞

当我在 docker-compose 环境中运行测试时,我可以从日志(包括在下面)中看到:

  • 生产者和消费者连接到同一个代理和主题
  • 生产者将数据发送到主题,但消费者正在阻塞

xUnit 和进程内 Kestrel 主机在与 kafka 服务相同的网络中的 docker-compose 服务中运行。Kafka 生产者能够成功地将数据发布到 kafka 主题,如下面的日志所示。

我创建了一个额外的 docker-compose 服务,它运行一个 python 客户端消费者。这使用轮询循环来使用在运行测试时发布的数据。数据由 Python 客户端使用。

有没有人知道为什么消费者会在这个环境中阻止以帮助查找故障?xUnit 测试中执行的等待会阻止由 xUnit 固定装置启动的进程内 Kestrel 主机吗?

如果我在 MacOS Catalina 10.15.7 上本地运行 Kestrel 主机,在 docker-compose 中连接到 Kafka(图像:lensio:fast-data-dev-2.5.1-L0),它会成功生成和使用。

更新 - 使用 lensio 图像工作 的本地 docker-compose 使用 docker image for lensio:fast-data-dev-2.5.1-L0。这使用 Apache Kafka 2.5.1 和 Confluent 组件 5.5.1。我也试过:

  • 降级到 Confluent Kafka 图像 5.5.1
  • 将 .Net Confluent 客户端升级到 1.5.3

结果保持不变,生产者生产良好,但消费者阻止。

lensio:fast-data-dev-2.5.1-LO配置和会导致阻塞的confluent/cp图像有什么区别?

我已将工作中的 docker-compose 配置标记到此查询的末尾。

更新 - 当 group.initial.rebalance.delay 为 0ms 时适用于 confluent/cp-kafka 图像

最初group.initial.rebalance.delay是 1ms,与lensio:fast-data-dev-2.5.1-LO图像相同。confluent/cp-kafka 图像上的 1ms 设置表现出阻塞行为。

如果我将其更改group.initial.rebalance.delay为 0ms,则 confluent/cp-kafka 图像不会发生阻塞。

与confluent-kafka-dotnet 客户端一起使用时, lensio:fast-data-dev-2.5.1-LO映像是否在docker -compose 开发环境中提供更好的性能?

测试

测试输出 - 生产者已在主题上生成数据,但消费者尚未消费

卡夫卡消费者

Kestrel 主机配置

码头工人组成堆栈

适用于 lensio:fast-data-dev 图像。为什么?

0 投票
0 回答
436 浏览

.net-core - Confluent Kafka 消费者仅在更改 groupId 后才消费消息

我有一个使用 Confluent.Kafka 的 .Net 核心控制台应用程序。我建立了一个消费者来消费来自特定主题的消息。该应用程序旨在每天运行几次,使用指定主题的消息并处理它们。

我花了一段时间来理解消费者的行为,但只有当它的 groupId 是一个以前从未使用过的消息时,它才会使用消息。每次我更改消费者的 groupId - 消费者都会获取订阅主题中的消息。但是在接下来的运行中——使用相同的 groupId——consumer.Consume 返回 null。

这种行为似乎与同一组消费者之间的重新平衡有关。但我不明白为什么——因为消费者应该只存在于整个应用程序运行时间。在离开应用程序之前,我调用了 consumer.close() 和 consumer.Dispose()。这些应该破坏消费者,以便在下一次运行时,当我创建消费者时,它将再次成为指定 groupId 上的第一个和单个消费者。但正如我所说,事实并非如此。

我知道有关于该主题的消息 - 我通过命令行检查它。而且我还确保主题只有 1 个分区。

最奇怪的是,我有另一个 .net 核心控制台应用程序,它执行相同的过程 - 并且完全没有问题。

我附上了 2 个应用程序的代码。

工作应用程序 - 总是消耗:

类 ConsumerHelper.cs

不工作的应用程序 - 仅在将使用者 groupId 更改为从未使用过的组后第一次运行时使用:

类 Program.cs

类 ConsumerHelper.cs

0 投票
1 回答
948 浏览

polly - 如何实现等待重试瞬态故障处理策略?

我对 Kafka 和Polly 还很陌生。我正在寻求有关如何在将 Admin Client 与 Kakfa Confluent .NET 客户端一起使用时实现故障恢复的建议。如果在启动 Blazor 服务器 Web 应用程序期间主题不存在,我正在使用管理客户端创建主题。

最初,我尝试使用polly来实现一个简单的等待和重试策略,如下所示。我希望这会重试创建主题操作以进行可配置的尝试次数。在每次重试尝试之间有一个短暂的可配置等待延迟。如果所有重试尝试都已用尽,则发出致命错误信号并且应用程序正常退出。

等待和重试策略

使用管理客户端实例的等待和重试策略来创建主题

日志和分析

当我尝试运行它时,我可以从下面的日志中看到尝试了一次重试。但是,不会尝试后续重试。日志突出显示rdkafka库检测到并抑制了相同的错误。

我认为这就是日志中没有显示后续重试尝试的原因,即底层rdkafka库轮询线程隐藏了后续本地超时错误并继续尝试连接到代理。随后,.NET 客户端无法引发异常,因为它没有收到失败通知。这意味着 polly 卡在等待并运行第二次尝试?

Confluent Kafka .NET 库在第一次尝试时抛出的错误是ErrorCode.Local_TimedOut 。我认为这对应于rdkafka:error local timeout ?? 经过调查,我发现以下与瞬态故障处理相关的AdminClient配置属性:

这让我尝试了 Confluent Kafka 内置的瞬态故障处理:

在启动过程中稍等片刻后,我启动了 Kafka 代理。下面的调试日志显示rdkafka线程检测到主代理已启动并正在运行。但是,未执行创建主题管理操作。

调试日志

由于空间限制,可在pastebin上查看。

问题

是否可以配置rdkafka,以便在尝试将AdminClient连接到代理时不会抑制相同的错误?

在上面的例子中,一旦rdkafka poll 线程最终检测到 broker 启动并运行,为什么AdminClient创建主题操作没有完成?

如何使用带/不带 Polly 的 Confluent Kafka .NET 为请求在代理上创建主题的 Kafka AdminClient实施重试和等待失败策略?

更新

创建一个小型控制台应用程序,可在pastebin获得。

如果在每次重试尝试时创建一个新的 AdminClient,并且在检测到故障后最终启动代理,那么它就可以工作。

但是,如果每次重试尝试都重复使用相同的 AdminClient 实例,并且在检测到故障后最终启动代理,则程序会阻塞。我认为这是因为 rdkafka 库正在为客户端抑制多个本地超时错误。它只通知 Confluent Kafka .NET 初始故障检测。

有没有更好的方法,而不是在每次重试时创建一个新的 AdminClient 实例?

0 投票
1 回答
111 浏览

.net - Kafka 消费者读取整个分区后会发生什么?

我确实有一个在 Windows 服务中使用的 .Net 代码,该服务具有初始化消费者对象和使用 Kafka 的过程。

我想知道从分区中消耗完所有消息后会发生什么。仅供参考:有 3 个分区和 4 个节点用于消费信息。

0 投票
1 回答
393 浏览

apache-kafka - 使用 MassTransit 和 Kafka 配置 Avro

如何将 MassTransit 配置为在使用 Avro 进行序列化/反序列化时使用 Confluent Kafka 主题进行生产和消费?我看到 Avro 序列化器/反序列化器在包中Confluent.SchemaRegistry.Serdes。一些代码示例将受到欢迎。

0 投票
0 回答
64 浏览

c# - 获取 Confluent.kafka nuget 包中主题的生产者列表

是否可以获得已向主题发送消息的生产者列表?我正在为 C# 使用 Confluent.Kafka nuget 包。