0

我正在使用 Apache Flink 来预测来自 Twitter 的流。

代码在 Scala 中实现

我的问题是,我从 DataSet API 训练的 SVM 模型需要一个 DataSet 作为 predict() 方法的输入。

我已经在这里看到了一个问题,用户说,您需要编写一个自己的 MapFunction 来在工作开始时读取模型(参考:使用 scala 在 Flink 中进行实时流预测

但我无法编写/理解这段代码。

即使我在 StreamingMapFunction 中得到模型。我仍然需要一个 DataSet 作为参数来预测结果。

我真的希望有人可以向我展示/解释这是如何完成的。

Flink 版本:1.9 Scala 版本:2.11 Flink-ML:2.11

val strEnv = StreamExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment

//this is my Model including all the terms to calculate the tfidf-values and to create a libsvm
val featureVectorService = new FeatureVectorService
        featureVectorService.learnTrainingData(labeledData, false)

//reads the created libsvm
val trainingData: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "...")
        val svm = SVM()
                .setBlocks(env.getParallelism)
                .setIterations(100)
                .setRegularization(0.001)
                .setStepsize(0.1)
                .setSeed(42)
//learning
svm.fit(trainingData)

//this is my twitter stream - text should be predicted later
val streamSource: DataStream[String] = strEnv.addSource(new TwitterSource(params.getProperties))

//the texts i want to transform to tfidf using the service upon and give it the svm to predict
val tweets: DataStream[(String, String)] = streamSource
                .flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)

4

1 回答 1

2

因此,目前作为其中SVM一部分的 FlinkML 不支持流式 API。这就是为什么SVM只接受DataSet。这个想法不是使用 FlinkML,而是使用 scala 或 java 中可用的一些 SVM 库。然后您可以读取模型,例如从文件中读取。问题是您必须自己实现大部分逻辑。

您提到的帖子中的评论或多或少说的是完全相同的事情。

于 2019-10-24T10:56:32.743 回答