问题标签 [apache-kafka-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2971 浏览

concurrency - 在 Kafka 流中执行异步转换

假设我有两个 Kafka 主题,AB。我正在尝试开发一个系统,从A中提取记录,对每个记录应用转换,然后将转换后的记录发布到B。在这种情况下,转换涉及通过 HTTP 调用 REST 端点。

作为 Kafka 的新手,我很高兴看到 Kafka Streams 项目已经解决了这类问题(consume-transform-publish)。不幸的是,我发现 Kafka 流中的转换是阻塞操作。本能地,我尝试以非阻塞、异步的方式调用 HTTP 端点。

这是否意味着 Kafka Streams 在这种情况下无法工作?这是否意味着我必须恢复以阻塞方式调用 REST 端点?这甚至是 Kafka Streams 可以接受的模式吗?基于流的数据处理对我来说还是比较新的,所以我并不完全熟悉它的并发模型。

0 投票
2 回答
506 浏览

apache-kafka - 我可以在 HDP 2.4 平台上安装 Confluent

我正在尝试通过 HDP 为 Kafka Streams 安装 Confluent,这可能是不可能的,你们能否建议我该怎么做

0 投票
1 回答
2948 浏览

apache-kafka - Kafka-streams 状态目录 io 错误

流运行一段时间后出现以下错误?我找不到谁负责创建 .sst 文件?

环境:

卡夫卡版本 0.10.0-cp1

斯卡拉 2.11.8

0 投票
3 回答
7272 浏览

apache-kafka - Kafka Streams:如何写入主题?

在 Kafka Streams 中,产生/写入流的规范方式是什么?在 Spark 中,有一个自定义接收器,它作为来自任意数据源的长时间运行的适配器。Kafka Streams 中的等价物是什么?

具体来说,我不是在问如何从一个主题转换到另一个主题。文档对此非常清楚。我想了解如何编写我的工人,这些工人将在一系列转换到 Kafka 中进行第一次写入。

我希望能够做到

但是现有的文档都没有显示这一点?我错过了什么吗?

0 投票
2 回答
2654 浏览

java - kafka ktable - 通过java访问rocksdb

今天早上我一直在阅读有关ktables的信息,希望能够实现滚动窗口键值存储。我可以看到最新版本的 kafka 似乎暗示这是可能的,但我更想知道从“外部”应用程序访问键值数据。

假设我实现了一个kstreams应用程序,它正在使用来自一个主题的日志数据(或类似的数据),并愉快地开窗、聚合和生产到另一个主题。现在我想看看这个来自其他进程的键值数据。文档提示数据由RocksDB存储。我可以从“外部”读取这个作为对所述数据库的调用吗?或者这些数据是否仅作为虚拟构造提供给 kstreams 应用程序?

0 投票
1 回答
652 浏览

apache-kafka - 使用 Kafka Streams 进行工作分配

我正在使用 Kafka Streams 对 Kafka 主题进行并发工作。

流具有以下形式

我已经设置KStreams了 15 个工作线程,但似乎工作在线程之间没有正确平衡(或根本没有平衡)。我的设置可能有问题吗?我原以为工作会在工作线程之间平均分配,但似乎情况并非如此。

来自 jvisualvm 的快照

0 投票
2 回答
1483 浏览

java - Confluent Kafka Streaming 示例不起作用

我尝试在以下位置运行 kafka-streams 示例:https ://github.com/confluentinc/examples/tree/master/kafka-streams

分支“kafka-0.10.0.0-cp-3.0.0”之类的命令应该是“开箱即用mvn compile” 。mvn test

在此处输入图像描述

我收到一条错误消息:

完整的问题在这里

如何让 Kafka 流式传输示例正常工作?

0 投票
2 回答
1105 浏览

apache-kafka - Kafka:在不同版本之间复制主题?

我有两个经纪人。第一次运行 0.9,第二次运行 0.10

各种工作人员和守护进程在两个代理上消费和产生消息。

对于一个应用程序,我需要来自正在使用KStreams并连接到 0.10 代理的应用程序的 0.9 主题消耗品的消息。

有没有一种简单的方法可以将一个主题从 0.9 复制到 0.10?还是使用 0.10 客户端连接到 0.9?我不想不得不将两个版本都塞进同一个罐子里。仅使用 0.10 客户端使用 0.9 代理似乎不起作用。

0 投票
2 回答
8741 浏览

apache-kafka - 删除未使用的 kafka 消费组

我正在使用带有压缩主题的 Apache Kafka 0.10 作为分布式缓存同步机制。当应用程序启动时,它会生成一个特定于实例的consumer group id. 随着实例的添加和删除以实现水平可伸缩性,显然我们得到了大量不应再次使用的组 ID。

我确信这是 and 的完美用例KStreamsKTables但出于智力原因以及KStreamsandKTables被定义为 0.10 中的 alpha 质量,我自己尝试这样做。

是否有我可以使用的 Kafka API 调用可以删除现有的消费者组,知道它不应该再次使用?

由于 Zookeeper 在 0.10 版本中没有维护消费者偏移量,有没有办法使用 Kafka 删除消费者组?

0 投票
1 回答
437 浏览

apache-kafka - Joining two Kafka KTable results in a Nullpointer in RocksDB

I am trying to join two Kafka Stream DSL KTable using:

I have made sure that both the keys and the values are not null:

But the application reports that there is a null pointer in the RocksDB layer concerning the partition.

[2016-07-17 21:58:04,682] ERROR User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group streams-persons2 failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) java.lang.NullPointerException at org.rocksdb.RocksDB.put(RocksDB.java:432) at org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:299) at org.apache.kafka.streams.state.internals.RocksDBStore.access$200(RocksDBStore.java:62) at org.apache.kafka.streams.state.internals.RocksDBStore$3.restore(RocksDBStore.java:206) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:210) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116) at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:202)