我有一个从 MQTT 代理接收数据的拓扑,我希望 spout 的行为如下:
每 x 秒发出一批元组(或单个元组中的字符串列表)。我如何实现这一目标?我读了一些关于 Storm Trident 的文章,但它
IBatchSpout
似乎不允许我以特定的时间间隔批量发出元组。如果没有新数据进来,spout 应该怎么做?它不能阻塞线程,因为它是 Storm 的主线程,对吧?
我有一个从 MQTT 代理接收数据的拓扑,我希望 spout 的行为如下:
每 x 秒发出一批元组(或单个元组中的字符串列表)。我如何实现这一目标?我读了一些关于 Storm Trident 的文章,但它IBatchSpout
似乎不允许我以特定的时间间隔批量发出元组。
如果没有新数据进来,spout 应该怎么做?它不能阻塞线程,因为它是 Storm 的主线程,对吧?
你可以实现你自己的 MQTT spout。例如,看看MongoSpout。
重要的部分是nextTuple
方法。
调用此方法时,Storm 请求 Spout 将元组发送到输出收集器。这个方法应该是非阻塞的,所以如果 Spout 没有要发出的元组,这个方法应该返回。 nextTuple、ack 和 fail 都在 spout 任务的单个线程中的紧密循环中调用。当没有要发出的元组时,最好让 nextTuple 休眠一小段时间(比如一毫秒),以免浪费太多 CPU。
您不能一次等待指定的时间,但您可以实现nextTuple
它只偶尔发出一次元组。
private static final EMISSION_PERIOD = 2000; // 2 seconds
private long lastEmission;
@Override
public void nextTuple() {
if (lastEmission == null ||
lastEmission + EMISSION_PERIOD >= System.currentMillis()) {
List<Object> tuple = pollMQTT();
if (tuple != null) {
this.collector.emit(tuple);
return;
}
}
Utils.sleep(50);
}
请注意,我找到了一个开源MQTT spout。它看起来还没有准备好生产,但您可以将其用作起点。
除了 Christian,我还为 Storm 的 MQTT 客户端找到了这个实现。前面提到的链接仍未开发。