我正在尝试将 Kafka 与 Heron 拓扑集成。但是,我找不到最新版本的 Heron (0.17.5) 的任何示例。是否有任何可以共享的示例或有关如何实现自定义 Kafka Spout 和 Kafka Bolt 的任何建议?
编辑1:
我相信KafkaSpout和KafkaBolt在 Heron 中被有意弃用,以便为新的Streamlet API 让路。我目前正在查看是否可以使用 Streamlet API构建KafkaSource和KafkaSink 。但是,当我尝试在Source中创建KafkaConsumer时,出现以下异常。
Caused by: java.io.NotSerializableException: org.apache.kafka.clients.consumer.KafkaConsumer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at com.twitter.heron.api.utils.Utils.serialize(Utils.java:97)
编辑2:
修复了上述问题。我正在初始化KafkaConsumer
错误的构造函数。在方法中初始化相同的setup()
方法修复了它。