问题标签 [librdkafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
879 浏览

apache-kafka - node-rdkafka - 调试设置为全部,但我只看到代理传输失败

我正在尝试连接到 kafka 服务器。身份验证基于 GSSAPI。

这是我的 test_kafka.js:

0 投票
1 回答
374 浏览

logging - librdkafka.redist 的日志是什么意思

Confluent.Kafka这是从与的librdkafka.redist依赖关系相关的控制台日志中提取的。

我试图了解每个管道分隔部分是什么。谁能告诉我?我找不到任何关于它的信息。我假设第二列是某种时间戳,第三列是状态,第四列是错误消息。

0 投票
0 回答
30 浏览

apache-kafka - 消息在 kafka 生产者和消费者之间消失

我有一个非常简单的嵌入式 kafka 应用程序:一个生产者线程和两个编写顶级 postgres db 的消费者线程。这三个线程在一个进程中运行。我正在使用 librdkafka 来实现我的消费者和生产者,并将 apache-kafka 作为代理运行。消息大小约为 2kB。我有两个计数器:一个在我写入时递增(rd_kafka_produce),另一个在我读取时递增(rd_kafka_consume_batch)。如果我的生产者运行得足够快(超过 30000 条消息/秒),生产者计数器最终会比消费者计数器大得多(如果我运行 30 秒,则为 15% 左右)。所以我在某处丢失了消息。我的第一个问题是如何调试这样的问题?第二个是这个问题最可能的原因是什么,我该如何解决?

0 投票
1 回答
39 浏览

c++ - Spring Kafka:ReplyingKafkaTemplate 回复的 C++ 生产者

我使用 ReplyingKafkaTemplate 向 Kafka 发送请求并接收回复。请求的消费者是一个 C++ 程序。处理请求后,其回复由 C++ 应用程序生成。现在我的问题是关于回复的格式,因为spring-side中回复的消费者似乎使用相关ID来找到相应的回复。有人可以帮我解决这个问题吗?

0 投票
0 回答
119 浏览

python - 为什么即使传递了错误的代理主机名,librdkafka 也不会抛出任何错误/异常?

我正在编写使用confluent_kafka api(内部使用librdkafka)来计算kafka吞吐量的python应用程序,但是在这个过程中我观察到,即使我传递了虚拟代理名称,应用程序也不会抛出任何错误,所以我缺少一些东西或者这是api中的错误。还有什么可以更好地处理这个问题?

当我通过传递不正确的代理来测试此应用程序时,没有错误/异常:

0 投票
0 回答
406 浏览

plugins - 未调用 librdkafka 注册 oauthbearer_token_refresh_cb 的自定义插件

我正在尝试编写自定义插件,librdkafka该插件将提供oauthbearer_token_refresh_cb.

在我们公司,我们使用带有自定义 OUTHBEARER sasl 机制的 kafka,该机制在 java/kotlin 中运行良好,在 .Net 中也运行良好。下一步是实现相同的,librdkafka因为我们需要它用于 Clickhouse DB 的 kafka 连接器实现,它使用librdkafka.

我尝试使用以下插件实现:

自定义oauth.c

编译它:

制作共享库:

通过以下方式试用kafkacat

结果是:

如果我注释掉之后发生的所有 printf-s,rd_kafka_conf_set_oauthbearer_token_refresh_cb那么我不会得到% ERROR: No such configuration property: "plugin.library.paths" (plugin lib-oauth-cb.so)

但是也不起作用,并且我设置 oauth 持有者令牌的回调永远不会被调用。这是输出:

笔记:

  • 这个 kafka 集群有 3 个不同的监听器
    • 端口 9092 PLAINTEXT 协议(尝试过 kafkacat 并且可以正常工作)
    • 端口 9093 SSL 协议(尝试过 kafkacat 并且可以正常工作)
    • 端口 9094 SASL_SSL 协议,使用 OAUTHBEARER 作为 sasl 机制
  • 我已经在我的 Mac 上安装librdkafkakafkacat使用brew,据我所知,librdkafka 的版本是1.5.0

问题

  1. 为什么我的回调oauth_refresh从未被调用(没有============== Initializing oauthbearer config......印在stdout

  2. 如果后面No such configuration property: "plugin.library.paths" (plugin lib-oauth-cb.so)有一些,为什么我会收到错误printfrd_kafka_conf_set_oauthbearer_token_refresh_cb

    • 只要rd_kafka_conf_set_oauthbearer_token_refresh_cb调用之后就不会出现这个错误printf
    • 即使多次设置也不会导致此错误:
      • printf
      • rd_kafka_conf_set_oauthbearer_token_refresh_cb
      • printf
      • rd_kafka_conf_set_oauthbearer_token_refresh_cb
    • printf导致此错误的副作用是什么?
    • 这部分问题不太重要,因为我不打算保留那些 printf-s,部分1)很重要

目标

  • 我的目标是拥有我们 OUTHBEARER 方案的小型可插拔库,它可以插入任何使用的东西librdkafka(最终目标是Clickhouse的 kafka 连接器)
  • 我使用kafkacat它只是因为它是一种易于试验的工具
  • 关键是可插入性,因此无需重新编译某些使用 librdkafka 的应用程序即可使用自定义刷新 oauth 令牌逻辑。

编辑

我有几个问题:

  • conf_init 的签名:
    • 需要是:rd_kafka_resp_err_t conf_init(rd_kafka_conf_t *conf, const char *path, char *errstr, size_t errstr_size)
    • 代替:void conf_init(rd_kafka_conf_t *conf, const char *path, char *errstr, size_t errstr_size)
  • 在我的示例中设置 oauthbearer 令牌时,需要改为nowMs + someTTL过期0
  • 我最终使用rdkafka_example.c 而不是安装kafkacat,因为我安装的最新版本的 kafka catbrew1.5.0
    • 当与这个旧版本运行时,kafkacat我遇到了以下错误kafkacat(76875,0x11b279dc0) malloc: *** error for object 0x10bad7bd0: pointer being freed was not allocated kafkacat(76875,0x11b279dc0) malloc: *** set a breakpoint in malloc_error_break to debug
  • 我让它在消费者模式下工作,-C但不是在打印元数据模式下-L,因为 with-L模式没有调用rd_kafka_poll,所以我的回调从未被调用。
0 投票
0 回答
830 浏览

c++ - 使用 rdkafkacpp.h 的 C++ 中的基本 Kafka Consumer

我对使用 C++ 的 rdkafkacpp.h Kafka 库相当陌生。

我参考了一些可用的在线资源,关于如何在 Windows 上设置 Kafka 消费者,使用提供的标准 Kafka 环境设置 .bat 文件:https : //kafka.apache.org/quickstart能够测试从生产者终端发送的消息是否出现在消费者终端上)

我已经阅读了 Kafka 的基本理论并浏览了以下文档:https ://docs.confluent.io/4.0.0/clients/librdkafka

在阅读完这些基本内容后,我尝试使用内置库函数编写示例代码,如下所示。但是,我在尝试设置“conf”的引导服务器属性的代码行收到运行时异常。该异常是访问冲突异常。“在 mykafka.exe 中的 0x00007FFB878EC3F9 (msvcr120.dll) 处引发异常:0xC0000005:访问冲突读取位置 0x0000008E5A900000。”

我怀疑使用库函数的 Kafka 消费者“实现”的顺序错过了一些步骤,或者必须重新排序。

我试图保持实现简单,我的本地机器上只有一个生产者(localhost:9092),只有这个消费者(mykafka.exe)。此外,一个主题“快速启动事件”已在终端上启动。

任何帮助是极大的赞赏!

PS:Visual Studio 2019 用于此代码开发。

0 投票
0 回答
136 浏览

node.js - 如何避免 HighlevelProducer rdkafka 中的数据丢失

使用 npm librdkafka 我正在向 Kafka 生成大量数据。我正在使用 HighlevelProducer。对于 50K 消息,此配置工作正常。但是对于像 100K 消息这样的大容量,会有数据丢失。根据配置,数据丢失从 200-1K 不等。尝试了各种配置,但无法找到达到 100% 准确度的最佳配置。任何建议都是有帮助的。下面是我的配置。

0 投票
0 回答
106 浏览

go - Librdkafka Go 消费者和 websocket 问题

我正在尝试编写一些使用 librdkafka Kafka Consumer 的简单 Go 代码,使用 Kafka 主题上的消息并将该消息发布到 HTTP(升级到 websocket)端点。类似于以下内容

Kafka Topic(myTopic) --> Go --> 客户端使用 HTTP 但升级到 Websockets 并保持开放连接并通过 Go 代码使用来自 Kafka 主题的所有消息。这是我到目前为止所尝试的。虽然我能够使用有关该主题的消息以及单独测试我的 websockets 代码,但我无法将两者拼接在一起。我无法弄清楚如何将 Kafka 消息传递给升级的 websocket 连接。

在这个阶段,我将所有代码保存在一个文件中,这违反了最佳实践。一旦我得到它的工作,我将重构代码。

0 投票
1 回答
208 浏览

go - librdkafka 和 confluent-kafka-go 是否支持基于 JKS 的 SSL 配置?

我们可以配置 librdkafka 和 confluet-kafka-go 使用 jks 文件吗