3

所以我要解决的问题如下:

  • 我需要一个以特定频率发出消息的数据源
  • 有 N 个神经网络需要单独处理每条消息
  • 汇总所有神经网络的输出,并且仅当收集了每个消息的所有 N 个输出时,才应将消息声明为已完全处理
  • 最后,我应该测量一条消息被完全处理所花费的时间(从它发出到收集该消息的所有 N 个神经网络输出之间的时间)

我很好奇如何使用火花流处理这样的任务。

我当前的实现使用了 3 种类型的组件:一个自定义接收器和两个实现 Function 的类,一个用于神经网络,一个用于最终聚合器。

概括地说,我的应用程序构建如下:

JavaReceiverInputDStream<...> rndLists = jssc.receiverStream(new JavaRandomReceiver(...));

Function<JavaRDD<...>, Void> aggregator = new JavaSyncBarrier(numberOfNets);

for(int i = 0; i < numberOfNets; i++){
    rndLists.map(new NeuralNetMapper(neuralNetConfig)).foreachRDD(aggregator);
}

不过,我遇到的主要问题是它在本地模式下的运行速度比提交到 4 节点集群时要快。

我的实施一开始是错误的还是这里发生了其他事情?

这里还有一个完整的帖子http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-td12893.html详细介绍了这三个中的每一个的实现前面提到的组件。

4

1 回答 1

5

似乎可能有很多重复的对象实例化和序列化。后者可能会影响您在集群中的性能。

你应该只尝试一次实例化你的神经网络。您必须确保它们是可序列化的。您应该使用flatMap而不是多个maps + union。这些方面的东西:

// Initialize neural net first
List<NeuralNetMapper> neuralNetMappers = new ArrayList<>(numberOfNets);
for(int i = 0; i < numberOfNets; i++){
    neuralNetMappers.add(new NeuralNetMapper(neuralNetConfig));
}

// Then create a DStream applying all of them
JavaDStream<Result> neuralNetResults = rndLists.flatMap(new FlatMapFunction<Item, Result>() {
    @Override
    public Iterable<Result> call(Item item) {
        List<Result> results = new ArrayList<>(numberOfNets);
        for (int i = 0; i < numberOfNets; i++) {
            results.add(neuralNetMappers.get(i).doYourNeuralNetStuff(item));
        }
        return results;
    }
});

// The aggregation stuff
neuralNetResults.foreachRDD(aggregator);

如果您负担得起以这种方式初始化网络,您可以节省大量时间。此外,union您在链接帖子中包含的内容似乎是不必要的,并且正在惩罚您的表现:a flatMapwill do。

最后,为了进一步调整集群中的性能,您可以使用 Kryo 序列化程序

于 2014-09-10T13:23:01.000 回答