4

我希望使用 kafka 仅读取 spark 流中的最新消息,但它也会获取过去的数据

如何在 KafkaUtil 中为 spark 设置 auto.offset.reset

JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, args[0], args[1], topicMap);

如何将 conf 设置为仅获取当前消息。请举一些例子。

提前谢谢,还有另一个线程

但还不够,请帮帮我。提前致谢。

4

1 回答 1

7

您需要从 KafkaUtils 对象中使用此方法:

 def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
      jssc: JavaStreamingContext,
      keyTypeClass: Class[K],
      valueTypeClass: Class[V],
      keyDecoderClass: Class[U],
      valueDecoderClass: Class[T],
      kafkaParams: JMap[String, String],
      topics: JMap[String, JInt],
      storageLevel: StorageLevel
    )

根据 Spark 版本,您不能使用 java。有一个错误

如果您使用的是 Spark 1.1.0,则需要在kafkaParams参数中添加此属性:

“auto.offset.reset”、“最大”

另一种解决方法是随机生成一个groupId前缀,但这很糟糕。

于 2014-10-02T14:26:15.150 回答