4

我有一个从 MQTT 代理接收数据的拓扑,我希望 spout 的行为如下:

  1. 每 x 秒发出一批元组(或单个元组中的字符串列表)。我如何实现这一目标?我读了一些关于 Storm Trident 的文章,但它IBatchSpout似乎不允许我以特定的时间间隔批量发出元组。

  2. 如果没有新数据进来,spout 应该怎么做?它不能阻塞线程,因为它是 Storm 的主线程,对吧?

4

2 回答 2

2

你可以实现你自己的 MQTT spout。例如,看看MongoSpout

重要的部分是nextTuple方法。

调用此方法时,Storm 请求 Spout 将元组发送到输出收集器。这个方法应该是非阻塞的,所以如果 Spout 没有要发出的元组,这个方法应该返回。 nextTuple、ack 和 fail 都在 spout 任务的单个线程中的紧密循环中调用。当没有要发出的元组时,最好让 nextTuple 休眠一小段时间(比如一毫秒),以免浪费太多 CPU。

您不能一次等待指定的时间,但您可以实现nextTuple它只偶尔发出一次元组。

private static final EMISSION_PERIOD = 2000; // 2 seconds
private long lastEmission;

@Override
public void nextTuple() {
    if (lastEmission == null ||
            lastEmission + EMISSION_PERIOD >= System.currentMillis()) {
        List<Object> tuple = pollMQTT();
        if (tuple != null) {
            this.collector.emit(tuple);
            return;
        }
    }
    Utils.sleep(50);
}

请注意,我找到了一个开源MQTT spout。它看起来还没有准备好生产,但您可以将其用作起点。

于 2014-10-28T21:47:12.857 回答
1

除了 Christian,我还为 Storm 的 MQTT 客户端找到了这个实现。前面提到的链接仍未开发。

于 2015-01-20T22:23:21.793 回答