首先,我对 Storm/Trident 有点陌生,而且我已经在一个问题上苦苦挣扎了好几个小时。
我所拥有的是一个带有一个分区的 Kafka 主题。生产者每 x 毫秒向该主题发送元组。TransactionalTridentKafkaSpout 从这个主题中读取,一些 Trident 操作员处理它们。整个拓扑在本地模式下运行(远程模式尚未测试)。
拓扑的主要代码是:
TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(spoutConf);
TridentTopology topology = new TridentTopology();
Stream inStream = topology.newStream("kafka-spout", spout).parallelismHint(4);
TridentState state1=inStream
.groupBy(new Fields(ID_FIELD))
.persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, FIELD1, FIELD2, FIELD3), new CustomCombinerAgg1(), new Fields(COMB_AGG_1_FIELD))
.parallelismHint(4);
state1.newValuesStream().groupBy(new Fields(ID_FIELD)).
persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, COMB_AGG_1_FIELD), new CustomCombinerAgg2(), new Fields(COMB_AGG_2_FIELD))
.parallelismHint(4);
state1.newValuesStream().filter(new Fields(ID_FIELD, COMB_AGG_1_FIELD), new CustomBaseFilter1());
inStream.groupBy(new Fields(ID_FIELD))
.persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, FIELD1, FIELD2), new CustomCombinerAgg3(), new Fields(COMB_AGG_3_FIELD));
inStream.groupBy(new Fields(ID_FIELD))
.persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, FIELD1, FIELD2, FIELD3), new CustomCombinerAgg4(), new Fields(COMB_AGG_4_FIELD))
.newValuesStream().filter(new Fields(ID_FIELD, COMB_AGG_4_FIELD), new CustomBaseFilter2());
现在我遇到的问题是生产者的消息间隔越低,执行的一些操作符就越少。
例如,如果生产者以 100 毫秒的间隔发送 200 个元组,每个运算符正确处理所有 200 个元组,但如果间隔设置为 20 毫秒,则运算符处理 / 仅针对以下数量的元组执行:
CustomCombinerAgg1:200
CustomCombinerAgg2:50
CustomBaseFilter1:50
CustomCombinerAgg3:150
CustomCombinerAgg4:180
CustomBaseFilter2:60
据我了解(事务性)Trident 保证只处理一次,并且只有在前一个元组被完全处理后,才应该从 spout 中获取一批新的元组。这似乎不是这里的情况,而是第一个运算符 CustomCombinerAgg1 决定了速度,然后后面的运算符不能在给定时间内处理所有元组?
我期望的是,每个元组都正确执行每个运算符,并且一旦所有运算符都处理了元组/批处理,就会获取下一个运算符。使用 Trident 不应该是这种情况吗?难道我做错了什么?我怎样才能实现这种行为?
Trident 甚至如何知道元组何时已被完全处理?据我所知,您必须 ack() Storm 中的元组,但 Trident 运算符没有 OutputCollector,因此无法调用 ack()?我的问题是否与此有关?
谢谢。