问题标签 [kafkajs]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
162 浏览

ssl - 无法连接到 AWS MSK 并为客户端代理通信 kafkajs 启用了 sasl_scram 和 TLS 加密

无法连接到 AWS MSK 并为客户端代理通信 kafkajs 启用了 sasl_scram 和 TLS 加密

出现错误

0 投票
0 回答
125 浏览

javascript - 将 Kafka 事件的消息发送到套接字

我正在尝试创建一个应用程序(在 nodejs 中)以将 kafka 事件流式传输到套接字连接。我使用的库是:kafkajs 和 socket.io

我可以通过以下方式对一个套接字连接执行相同的操作:

我上面所做的是每次客户端连接并发送消息启动流时,我都会创建一个消费者连接,将其订阅到一个主题,并且每次主题收到一条消息时,我都会将其发送回客户端。

这适用于一个客户端,一旦第二个客户端连接,我就会得到预期的消费者错误已经在运行。

如何在套接字范围之外启动消费者(以便它在应用程序加载时启动)并在套接字内部接收任何消息流时(仅当有可用的套接字连接时)?

我对此很陌生,任何帮助将不胜感激。

0 投票
0 回答
370 浏览

apache-kafka - NestJS / Kafka - 自动重试不会在指定次数停止

我正在使用 NestJS 和 Kafka 来发布和订阅事件。当消费者抛出异常时,使用 KafkaJS 的自动重试器继续重试超出配置中指定的重试次数时,我遇到了问题。

配置

然后我们根据 NestJS 社区的建议使用了异常过滤。

过滤器被击中,但我们有一个计数和一个计时器。控制台日志如下所示:

任何建议或更改将不胜感激!

0 投票
0 回答
143 浏览

nestjs - 如何在 Nest.js 中为 prc 配置中间件

我的 nest.js 应用程序在DemoConsumerModule. 的代码片段DemoConsumerController

我想用中间件步骤丰富事件监听器(只是玩方法):

要激活中间件,我必须在模块中注册一个:

问题

问题是我的中间件不起作用。可能的问题很少。forRoutes如果跳过配置,则无法激活中间件。forRoutes消耗http配置,rpc所以我不确定是否可以针对可能的情况正确配置它。

附言

详细信息@EventPattern('demo-topic')

  • 运输Transport.KAFKA
  • "contextType":"rpc";
0 投票
0 回答
729 浏览

javascript - 当我尝试连接到 AWS MSK 代理时连接超时

我在 MSK 中创建了一个包含 2 个代理的集群,但我无法使用 kafkajs 客户端进行连接。我不断收到此连接超时错误

{"level":"ERROR","timestamp":"2021-07-29T13:35:35.625Z","logger":"kafkajs","message":"[Connection] 连接超时","broker": "{broker1}","clientId":"order-app"} {"level":"ERROR","timestamp":"2021-07-29T13:35:35.627Z","logger":"kafkajs", "message":"[BrokerPool] 连接种子代理失败,尝试列表中的另一个代理:连接超时","re​​tryCount":0,"retryTime":251}

有关集群配置的更多信息,我使用了无访问控制方法并启用了 TLS 加密(纯文本不是)。我尝试了端口 9092 和 9094。我将入站规则设置为所有流量只是为了进行测试,但它似乎仍然超时。还尝试启用和禁用 ssl,但结果相同。

0 投票
0 回答
401 浏览

node.js - 如何用 jest 测试在 kafka 中发送消息?

我有一个像这样的发送消息 kafka 我该如何测试这个功能jest,因为如果我们想运行 jest,我们必须运行 serviceKafkazookeeper。我能做什么 ?

我曾尝试过类似的东西:

它失败了,因为服务没有运行。

0 投票
1 回答
263 浏览

node.js - Kafkajs - 获取统计信息(滞后)

在我们的应用程序中,我们为 kafkanest.js使用kafkajs客户端。我们需要获得机会监控统计数据。指标之一是lag

试图弄清楚kafkajs是否提供任何有趣的东西。(有效载荷中最有趣的是:、、、、、、timestampoffsetbatchContext.firstOffsetbatchContext.firstTimestampbatchContext.maxTimestamp

问题

是否有任何想法如何记录lag由提供的值和其他统计信息kafkajs

我是否应该考虑实施自己的统计监视器以在使用kafka.js客户端的节点应用程序中收集所需的信息?

新细节 1

按照我可以获得的文档batch.highWatermark,在哪里

batch.highWatermark是主题分区中最后提交的偏移量。它可用于计算滞后。

我可以得到像下一个这样的信息:

那么有什么想法可以batch.highWatermark在标签计算中使用吗?

0 投票
1 回答
450 浏览

node.js - 关于在 node.js 中设计一个 Kafka Consumer 并使其具有幂等性的问题

我们正在尝试编写一个订阅特定主题的消费者,并在满足数据中的几个条件时进行一些处理。然而,处理中的一个操作只能执行一次,为了简单起见,让我们认为这是一个非幂等的 POST http 请求。

以下是其他一些注意事项:-

  • 消费者需要在 node.js 中编码并使用 kafkajs 库,
  • 消费者需要在 Kubernetes 上运行,
  • 副本的数量将等于生产者副本的数量
  • 我们将为我们的消费者使用一个消费者组

我们在想,为了让这个消费者幂等,我们也许可以做这样的事情:-

“将消息标记为已处理”基本上是将一些细节捕获到关系数据库(如 Postgres),偏移量、时间戳和其他一些细节,以确保我们捕获的键允许我们唯一地识别记录

  1. 以上看起来对使消费者幂等有好处吗?
  2. 您还能建议哪些其他替代方案效果更好?

除了上述之外,还有一些关于上述场景中数据库处理最佳实践的问题:-

  1. 假设我有 3 个 k8s 节点,每个节点都有 3 个消费者 pod 运行,本质上提供了 9 个单线程 kafka 消费者。这是正确的理解吗?

  2. 现在由于这些线程中的每一个都将执行数据库插入/读取,那么在池和客户端之间使用什么会更好(假设 node-postgres 库)?

  3. 似乎如果我们在一天的开始时打开一个客户端连接并让它一直存在到一天结束,它应该对我们有用。这是一个好方法还是一个非常糟糕的设计?

  4. 如果我们对这 9 个运行的消费者的每条消息进行处理,我们是否会通过使用池获得任何好处。

附加假设:-

  • 交通时间:东部时间上午 7:00 开始,白天接载,美国晚上逐渐减少。东部时间凌晨 2:00 到东部时间早上 6:00 之间没有交通。
  • 平均:- 美国白天每秒 1 条消息,
  • 最大:- 在美国白天,每秒 5 条消息,持续时间短。
  • 延迟容忍度:在正常情况下,消费者中的 POST 可以延迟不超过消息发布时间的 5 分钟。

感谢您耐心阅读并阅读此内容。为这篇文章的长度道歉。

0 投票
1 回答
101 浏览

javascript - 节点将十进制转换为字节 - Avro Schema Kafka / Kafkajs

我正在使用http://kafka.js.org/向 Kafka 生成消息,我正在处理的模式之一是十进制字节类型:

我正在寻找一些关于如何将 Node 中的小数转换为字节值以匹配上述模式的指导。

提前致谢。

0 投票
1 回答
201 浏览

javascript - 尽管主题和生产者都存在,但无法发送 Kafka 消息

我在 TypeScript 中使用本地的 KafkaJS 库,使用单个 kafka 代理。我已成功连接生产者,已验证我的主题已创建,并且正在生成消息:

现在当我去发送消息时:

这是我回来的错误: