问题标签 [librdkafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - node-rdkafka - 调试设置为全部,但我只看到代理传输失败
我正在尝试连接到 kafka 服务器。身份验证基于 GSSAPI。
这是我的 test_kafka.js:
logging - librdkafka.redist 的日志是什么意思
Confluent.Kafka
这是从与的librdkafka.redist
依赖关系相关的控制台日志中提取的。
我试图了解每个管道分隔部分是什么。谁能告诉我?我找不到任何关于它的信息。我假设第二列是某种时间戳,第三列是状态,第四列是错误消息。
apache-kafka - 消息在 kafka 生产者和消费者之间消失
我有一个非常简单的嵌入式 kafka 应用程序:一个生产者线程和两个编写顶级 postgres db 的消费者线程。这三个线程在一个进程中运行。我正在使用 librdkafka 来实现我的消费者和生产者,并将 apache-kafka 作为代理运行。消息大小约为 2kB。我有两个计数器:一个在我写入时递增(rd_kafka_produce),另一个在我读取时递增(rd_kafka_consume_batch)。如果我的生产者运行得足够快(超过 30000 条消息/秒),生产者计数器最终会比消费者计数器大得多(如果我运行 30 秒,则为 15% 左右)。所以我在某处丢失了消息。我的第一个问题是如何调试这样的问题?第二个是这个问题最可能的原因是什么,我该如何解决?
c++ - Spring Kafka:ReplyingKafkaTemplate 回复的 C++ 生产者
我使用 ReplyingKafkaTemplate 向 Kafka 发送请求并接收回复。请求的消费者是一个 C++ 程序。处理请求后,其回复由 C++ 应用程序生成。现在我的问题是关于回复的格式,因为spring-side中回复的消费者似乎使用相关ID来找到相应的回复。有人可以帮我解决这个问题吗?
python - 为什么即使传递了错误的代理主机名,librdkafka 也不会抛出任何错误/异常?
我正在编写使用confluent_kafka api(内部使用librdkafka)来计算kafka吞吐量的python应用程序,但是在这个过程中我观察到,即使我传递了虚拟代理名称,应用程序也不会抛出任何错误,所以我缺少一些东西或者这是api中的错误。还有什么可以更好地处理这个问题?
当我通过传递不正确的代理来测试此应用程序时,没有错误/异常:
plugins - 未调用 librdkafka 注册 oauthbearer_token_refresh_cb 的自定义插件
我正在尝试编写自定义插件,librdkafka
该插件将提供oauthbearer_token_refresh_cb
.
在我们公司,我们使用带有自定义 OUTHBEARER sasl 机制的 kafka,该机制在 java/kotlin 中运行良好,在 .Net 中也运行良好。下一步是实现相同的,librdkafka
因为我们需要它用于 Clickhouse DB 的 kafka 连接器实现,它使用librdkafka
.
我尝试使用以下插件实现:
自定义oauth.c
编译它:
制作共享库:
通过以下方式试用kafkacat
:
结果是:
如果我注释掉之后发生的所有 printf-s,rd_kafka_conf_set_oauthbearer_token_refresh_cb
那么我不会得到% ERROR: No such configuration property: "plugin.library.paths" (plugin lib-oauth-cb.so)
但是也不起作用,并且我设置 oauth 持有者令牌的回调永远不会被调用。这是输出:
笔记:
- 这个 kafka 集群有 3 个不同的监听器
- 端口 9092 PLAINTEXT 协议(尝试过 kafkacat 并且可以正常工作)
- 端口 9093 SSL 协议(尝试过 kafkacat 并且可以正常工作)
- 端口 9094 SASL_SSL 协议,使用 OAUTHBEARER 作为 sasl 机制
- 我已经在我的 Mac 上安装
librdkafka
和kafkacat
使用brew
,据我所知,librdkafka 的版本是1.5.0
问题
为什么我的回调
oauth_refresh
从未被调用(没有============== Initializing oauthbearer config
......印在stdout
)如果后面
No such configuration property: "plugin.library.paths" (plugin lib-oauth-cb.so)
有一些,为什么我会收到错误printf
rd_kafka_conf_set_oauthbearer_token_refresh_cb
- 只要
rd_kafka_conf_set_oauthbearer_token_refresh_cb
调用之后就不会出现这个错误printf
- 即使多次设置也不会导致此错误:
printf
rd_kafka_conf_set_oauthbearer_token_refresh_cb
printf
rd_kafka_conf_set_oauthbearer_token_refresh_cb
printf
导致此错误的副作用是什么?- 这部分问题不太重要,因为我不打算保留那些 printf-s,部分
1)
很重要
- 只要
目标
- 我的目标是拥有我们 OUTHBEARER 方案的小型可插拔库,它可以插入任何使用的东西
librdkafka
(最终目标是Clickhouse的 kafka 连接器) - 我使用
kafkacat
它只是因为它是一种易于试验的工具 - 关键是可插入性,因此无需重新编译某些使用 librdkafka 的应用程序即可使用自定义刷新 oauth 令牌逻辑。
编辑
我有几个问题:
- conf_init 的签名:
- 需要是:
rd_kafka_resp_err_t conf_init(rd_kafka_conf_t *conf, const char *path, char *errstr, size_t errstr_size)
- 代替:
void conf_init(rd_kafka_conf_t *conf, const char *path, char *errstr, size_t errstr_size)
- 需要是:
- 在我的示例中设置 oauthbearer 令牌时,需要改为
nowMs + someTTL
过期0
- 我最终使用rdkafka_example.c 而不是安装
kafkacat
,因为我安装的最新版本的 kafka catbrew
是1.5.0
- 当与这个旧版本运行时,
kafkacat
我遇到了以下错误kafkacat(76875,0x11b279dc0) malloc: *** error for object 0x10bad7bd0: pointer being freed was not allocated kafkacat(76875,0x11b279dc0) malloc: *** set a breakpoint in malloc_error_break to debug
- 当与这个旧版本运行时,
- 我让它在消费者模式下工作,
-C
但不是在打印元数据模式下-L
,因为 with-L
模式没有调用rd_kafka_poll
,所以我的回调从未被调用。- 启用开关后,我最终添加
rd_kafka_poll(rk, 5000);
了rdkafka_example.c-L
- 启用开关后,我最终添加
c++ - 使用 rdkafkacpp.h 的 C++ 中的基本 Kafka Consumer
我对使用 C++ 的 rdkafkacpp.h Kafka 库相当陌生。
我参考了一些可用的在线资源,关于如何在 Windows 上设置 Kafka 消费者,使用提供的标准 Kafka 环境设置 .bat 文件:https : //kafka.apache.org/quickstart能够测试从生产者终端发送的消息是否出现在消费者终端上)
我已经阅读了 Kafka 的基本理论并浏览了以下文档:https ://docs.confluent.io/4.0.0/clients/librdkafka
在阅读完这些基本内容后,我尝试使用内置库函数编写示例代码,如下所示。但是,我在尝试设置“conf”的引导服务器属性的代码行收到运行时异常。该异常是访问冲突异常。“在 mykafka.exe 中的 0x00007FFB878EC3F9 (msvcr120.dll) 处引发异常:0xC0000005:访问冲突读取位置 0x0000008E5A900000。”
我怀疑使用库函数的 Kafka 消费者“实现”的顺序错过了一些步骤,或者必须重新排序。
我试图保持实现简单,我的本地机器上只有一个生产者(localhost:9092),只有这个消费者(mykafka.exe)。此外,一个主题“快速启动事件”已在终端上启动。
任何帮助是极大的赞赏!
PS:Visual Studio 2019 用于此代码开发。
node.js - 如何避免 HighlevelProducer rdkafka 中的数据丢失
使用 npm librdkafka 我正在向 Kafka 生成大量数据。我正在使用 HighlevelProducer。对于 50K 消息,此配置工作正常。但是对于像 100K 消息这样的大容量,会有数据丢失。根据配置,数据丢失从 200-1K 不等。尝试了各种配置,但无法找到达到 100% 准确度的最佳配置。任何建议都是有帮助的。下面是我的配置。
go - Librdkafka Go 消费者和 websocket 问题
我正在尝试编写一些使用 librdkafka Kafka Consumer 的简单 Go 代码,使用 Kafka 主题上的消息并将该消息发布到 HTTP(升级到 websocket)端点。类似于以下内容
Kafka Topic(myTopic) --> Go --> 客户端使用 HTTP 但升级到 Websockets 并保持开放连接并通过 Go 代码使用来自 Kafka 主题的所有消息。这是我到目前为止所尝试的。虽然我能够使用有关该主题的消息以及单独测试我的 websockets 代码,但我无法将两者拼接在一起。我无法弄清楚如何将 Kafka 消息传递给升级的 websocket 连接。
在这个阶段,我将所有代码保存在一个文件中,这违反了最佳实践。一旦我得到它的工作,我将重构代码。
go - librdkafka 和 confluent-kafka-go 是否支持基于 JKS 的 SSL 配置?
我们可以配置 librdkafka 和 confluet-kafka-go 使用 jks 文件吗