问题标签 [apache-kafka-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
607 浏览

java - KStream - 将窗口计数转换为新的字符串,字符串主题

我有一个键控数字流,它们会运行到(Windowed).countByKey()中,然后在.forEach().

我想采用 Windowed 值,将它们与 start 和 end 值配对并将其放入一个新流中。我可以使用“传统”制作人来做到这一点,但我希望将所有工作都保存在一个应用程序中(而不是让第二个应用程序来处理新主题中的值)。

.forEach()KTable <Windowed<String>, Long>的形式出现,我没有看到明显的运算符将其(通过.to().through())链接到KStream<String, String>主题。

这可能吗?这个问题有意义吗?

0 投票
1 回答
1219 浏览

java - Kafka KStream - 显着的启动延迟

我遇到了基于KStreams的应用程序的问题:它会运行一次,当我停止/重新启动时,它会“卡住”并且在我删除它创建的各种主题之前不会再继续。这不会每次都发生,但经常发生。

通常,当我将新(er)版本复制到工作 VM 时(出于速度原因,在与 kafka 集群相同的子网中)时,会发生这种情况。

当它被楔入时,我会看到;

  1. “连接”: org.apache.zookeeper.ZooKeeper - Initiating client connection
  2. “客户”:[StreamThread-1] INFO o.a.k.s.p.internals.StreamTask - Creating restoration consumer client
  3. “Ping”:我会看到这些,应用程序不会正常关闭。它必须被杀死。

在任何这些情况下,消息通常会无限重复(嗯 - 至少在午餐+会议期间一直重复。IE 太长了)。

在这种情况发生之前,该应用程序正在“干净地”关闭。

我究竟做错了什么?


编辑:

最近一次 - 20 分钟后,我收到一连串错误:

org.apache.kafka.common.errors.TimeoutException:包含 101 条记录的批次由于超时而过期,同时向代理请求元数据

其次是:

org.apache.kafka.clients.consumer.CommitFailedException:提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员

--> 这是一个很好的技巧,因为没有其他成员

0 投票
0 回答
460 浏览

siddhi - Siddhi CEP 库执行计划可以针对 Kafka Streams 运行吗?

我想用 KafkaStreams 运行 Siddhi CEP 库,但看起来它已经有了自己的“流”概念。如何插入 KafkaStreams 以启用 Siddhi 执行计划在 KafkaStreams 上运行?

编辑解决 Dilini 的评论>>

参考: Kafka Streams 文档

Kafka Streams 目前没有与我发现的 CEP 框架紧密集成。例如,Apache Flink 拥有自己的 Flink CEP。因此,我设想使用 Siddhi CEP 作为 Kafka Streams 之上的抽象层,并在这两种技术之间进行紧密集成。例如,

  1. 创建 Siddhi CEP 流将自动创建 Kafka 主题和 Kafka 流以流式传输发布到该主题的事件。
  2. 创建 Siddhi CEP 输出流确实会创建 Kafka 主题并发布到它
  3. Siddhi CEP 的“事件表”可能是本地的 Kafka Streams“状态存储”或 Kafka 主题,因为它们本质上是一个复制的分区数据存储。这不是使用 RDBMS 或内存中的事件表等。
  4. Siddhi CEP 执行计划复杂的事件查询将转发到 Kafka Streams 处理器 API,这些 API 执行 map()、filter(),并加入 Kafka Streams 以检查模式等。

我试图理解为什么我想要多个“流”框架,特别是如果我的环境中已经有 Kafka Streams。

0 投票
1 回答
475 浏览

complex-event-processing - 实时规则编辑和KafkaStreams

我要求允许“业务”用户创建和编辑从输入流中摄取的数据的规则。规则必须对业务友好,并且不能有任何开发周期。这是我在想的一个例子:

由于气象站和连接的车辆流数据被摄取,我需要应用此规则,但如果规则更改为检测到的 PctWipersOn = 75%,则必须实时应用该规则,无需重新部署或重新启动。它需要数据驱动。

Siddhi CEP (WSO2 CEP)似乎只支持“部署”规则。是否有任何其他 CEP 产品可以满足我的需求,特别是如果它们与 KafkaStreams 配合得很好?

如果我必须自己动手,是否有针对此问题推荐的“流”设计模式?

0 投票
2 回答
19656 浏览

apache-kafka - 卡夫卡流并发?

我有一些基本的 Kafka Streaming 代码,可以从一个主题读取记录,进行一些处理,然后将记录输出到另一个主题。

Kafka 流如何处理并发?一切都在一个线程中运行吗?我没有在文档中看到这一点。

如果它是单线程的,我想要多线程处理的选项来处理大量数据。

如果它是多线程的,我需要了解它是如何工作的以及如何处理资源,比如 SQL 数据库连接应该在不同的处理线程中共享。

相对于其他选项(Spark、Akka、Samza、Storm 等),是否不建议将 Kafka 的内置流 API 用于大容量场景?

0 投票
1 回答
21707 浏览

java-native-interface - 为什么 Apache Kafka Streams 使用 RocksDB 以及如何改变它?

在研究 Apache Kafka 0.9 和 0.10 的新功能时,我们使用了 KStreams 和 KTables。有一个有趣的事实是 Kafka 在内部使用 RocksDB。请参阅Kafka Streams 简介:流处理变得简单。RocksDB 不是用 JVM 兼容语言编写的,因此需要仔细处理部署,因为它需要额外的共享库(依赖于操作系统)。

这里有一些简单的问题:

  • 为什么 Apache Kafka Streams 使用 RocksDB?
  • 怎么可能改变它?

我曾试图寻找答案,但我只看到隐含的原因,即 RocksDB 对于每秒数百万次操作范围内的操作非常快。

另一方面,我看到一些用 Java 编码的 DB,也许端到端它们可以做到这一点,而且它们不会通过 JNI。

0 投票
3 回答
1234 浏览

apache-kafka - 使用 Kafka Streams 聚合流数据

我正在使用这样的代码向 Kafka 发送消息:

我想用 Kafka Streams (0.10.0.1) 计算最后一小时内的消息总数。我试过了:

我对 Kafka/Streams 很陌生。我该怎么做?

0 投票
1 回答
3471 浏览

apache-kafka - Flink Kafka Stream 相对于 Spark Kafka Stream 的优势?Flink 上的 Kafka Stream 呢?

在 Spark Stream 中,我们为几乎实时的微批处理设置批处理间隔。在 Flink (DataStream) 或 Storm 中,stream 是实时的,所以我猜没有批处理间隔这个概念。

在kafka中,消费者在拉,我想象Spark使用batch interval参数从Kafka broker中拉出消息,那么Flink和Storm是怎么做的呢?我想象 Flink 和 Storm 在快速循环中拉取 Kafka 消息以形成实时流源,如果是这样,如果我将 Spark 批处理间隔设置为小,例如 100ms、50ms 甚至更小,我们和 Spark 之间是否存在显着差异流媒体和 Flink 还是 Storm?

同时,在 Spark 中,如果流数据很大,batch 间隔太小,我们可能会遇到有大量数据等待处理的情况,因此会发生变化,我们会看到 OutOfMemory 发生。它会发生在 Flink 或 Storm 中吗?

我已经实现了一个应用程序来进行主题到主题的转换,转换很容易,但是源数据可能很大(考虑它是一个物联网应用程序)。我的原始实现由reactive-kafka支持,它在我的独立 Scala/Akka 应用程序中运行良好。我没有实现要集群的应用程序,因为如果我需要它,Flink/Storm/Spark 已经在那里了。然后我找到了 Kafka Stream,对我来说,从客户端使用的角度来看,它类似于 reactive-akka。那么,如果我在独立应用程序或微服务中使用 Kafka Stream 或 reactive-kafka,我们是否需要关注客户端代码的可靠性/可用性?

0 投票
1 回答
9403 浏览

apache-kafka-streams - 如何将自定义 StateStore 添加到 Kafka Streams DSL 处理器?

对于我的 Kafka 流应用程序之一,我需要同时使用 DSL 和处理器 API 的功能。我的流媒体应用流程是

聚合后,我需要向接收器发送一条聚合消息。所以我定义我的拓扑如下

我定义了一个自定义StateStore并将其注册到我的处理器,如下所示

当我运行应用程序时,我得到java.lang.NullPointerException

org.apache.kafka.streams.processor.internals.ProcessorStateManager 的 org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:167) 的线程“StreamThread-18”java.lang.NullPointerException 中的异常.flush(ProcessorStateManager.java:332) 在 org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:252) 在 org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread .java:446) 在 org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422) 的 org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434) ) 在 org.apache.kafka.streams.processor.internals 的 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340)。StreamThread.run(StreamThread.java:218)

知道这里出了什么问题吗?

0 投票
3 回答
12374 浏览

apache-kafka - Is Kafka Stream StateStore global over all instances or just local?

In Kafka Stream WordCount example, it uses StateStore to store word counts. If there are multiple instances in the same consumer group, the StateStore is global to the group, or just local to an consumer instance?

Thnaks