1

首先,我对 Storm/Trident 有点陌生,而且我已经在一个问题上苦苦挣扎了好几个小时。

我所拥有的是一个带有一个分区的 Kafka 主题。生产者每 x 毫秒向该主题发送元组。TransactionalTridentKafkaSpout 从这个主题中读取,一些 Trident 操作员处理它们。整个拓扑在本地模式下运行(远程模式尚未测试)。

拓扑的主要代码是:

TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(spoutConf);
TridentTopology topology = new TridentTopology();
Stream inStream = topology.newStream("kafka-spout", spout).parallelismHint(4);

TridentState state1=inStream
    .groupBy(new Fields(ID_FIELD))
    .persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, FIELD1, FIELD2, FIELD3), new CustomCombinerAgg1(), new Fields(COMB_AGG_1_FIELD))
    .parallelismHint(4);

state1.newValuesStream().groupBy(new Fields(ID_FIELD)).
    persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, COMB_AGG_1_FIELD), new CustomCombinerAgg2(), new Fields(COMB_AGG_2_FIELD))
    .parallelismHint(4);

state1.newValuesStream().filter(new Fields(ID_FIELD, COMB_AGG_1_FIELD), new CustomBaseFilter1());

inStream.groupBy(new Fields(ID_FIELD))
    .persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, FIELD1, FIELD2), new CustomCombinerAgg3(), new Fields(COMB_AGG_3_FIELD));

inStream.groupBy(new Fields(ID_FIELD))
    .persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, FIELD1, FIELD2, FIELD3), new CustomCombinerAgg4(), new Fields(COMB_AGG_4_FIELD))
    .newValuesStream().filter(new Fields(ID_FIELD, COMB_AGG_4_FIELD), new CustomBaseFilter2());

现在我遇到的问题是生产者的消息间隔越低,执行的一些操作符就越少。
例如,如果生产者以 100 毫秒的间隔发送 200 个元组,每个运算符正确处理所有 200 个元组,但如果间隔设置为 20 毫秒,则运算符处理 / 仅针对以下数量的元组执行:
CustomCombinerAgg1:200
CustomCombinerAgg2:50
CustomBaseFilter1:50
CustomCombinerAgg3:150
CustomCombinerAgg4:180
CustomBaseFilter2:60

据我了解(事务性)Trident 保证只处理一次,并且只有在前一个元组被完全处理后,才应该从 spout 中获取一批新的元组。这似乎不是这里的情况,而是第一个运算符 CustomCombinerAgg1 决定了速度,然后后面的运算符不能在给定时间内处理所有元组?

我期望的是,每个元组都正确执行每个运算符,并且一旦所有运算符都处理了元组/批处理,就会获取下一个运算符。使用 Trident 不应该是这种情况吗?难道我做错了什么?我怎样才能实现这种行为?
Trident 甚至如何知道元组何时已被完全处理?据我所知,您必须 ack() Storm 中的元组,但 Trident 运算符没有 OutputCollector,因此无法调用 ack()?我的问题是否与此有关?

谢谢。

4

0 回答 0