我应该实现一个三叉戟事务拓扑。我发现我可以使用 kafka 作为 spout 来使我的拓扑具有事务性。我发现https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka这是一个 kafka spout forstorm 但它不是事务性的。我还发现 https://github.com/nathanmarz/storm/blob/master/storm-core/src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java这是一个三叉戟 kafka 事务喷口。但我没有它的 Maven 源代码,也没有使用它的说明。只有使用 kafka 风暴喷口的说明。我还需要一个实现分区卡夫卡的指令。如果您有实现事务拓扑的经验,请帮助我!谢谢!
1 回答
你看过这个Kafka Spout吗?
事务性 spout 不足以保证您的拓扑是事务性的。OpaqueTridentKafkaSpout 实现以下属性:
- 给定 txid 的批次总是相同的。重播一个 txid 的批次将与第一次为该 txid 发出该批次完全相同的一组元组。
- 元组批次之间没有重叠(元组在一批或另一批中,从不多个)。
- 每个元组都在一个批次中(没有元组被跳过)。
但是如果你试图持久化一些计算,那么你必须实现一个事务状态,以便对每批元组执行一次对数据库的更新
为了实现一次性处理,您必须为您的计算保留批处理事务 ID 和先前的更新值。
让我们考虑文档中的示例:
您正在处理以下一批元组:["man"] ["man"] ["dog"] 与事务 id "3" 关联
然后在您的数据库中,您当前持有一些单词计数器作为键/值对:
人=> [计数=3,txid=1];狗 => [count=4, txid=3] ; 苹果 => [count=10, txid=2]
由于与键“dog”关联的 txid 与当前处理事务相同,因此您可以跳过此更新。元组“dog”已经为此事务更新。但其他元组的情况并非如此。更新数据库后如下所示:
人=> [计数=5,txid=3];狗 => [计数=4,txid=3];苹果 => [count=10, txid=2]
通过这种方式,Trident 能够比较交易 ID 和以前的值来决定是否必须执行更新。
看看地图状态!
基本上,事务拓扑由事务喷口和事务状态组成。
您可以在此页面上找到有关三叉戟状态的更多信息:http: //storm.incubator.apache.org/documentation/Trident-state
我希望这能帮到您。