2

我正在尝试使用 Apache Flink ML 包的 StochasticOutlierSelection 模型。

我无法弄清楚如何将它与 Kafka 作为数据源一起使用,我知道它需要一个 DataSet 而不是 DataStream,但我似乎无法将我的 Kafka DataStream 窗口化为一个 DataSet。

有没有办法可以将我的流视为一系列小型数据集。例如,有没有办法说流中匹配模式的每 10 个元素(按元素唯一 ID 滑动窗口)将它们视为固定大小的 DataSet 并检测此固定大小数据集中的任何异常值?

我要创建的场景是:

数据源 -> Kafka 主题 1 -> Flink 预处理 -> Kafka 主题 2 -> Flink Groups By ID -> 组上的异常值检测

我已经有一个可以进行预处理的工作实现,并且希望 Flink 能够满足我的要求?

4

1 回答 1

1

我想你可以创建一个基于计数的全局窗口并使用 ExecutionEnvironment 来获取一个数据集。像下面这样的东西可能会起作用(getResult 将返回一个 DataSet):


      stream.
      keyBy(...).
      window(GlobalWindows.create).
      trigger(CountTrigger.of(10)).
      aggregate(new MyAggregator()).
      ...

    class MyAggregator extends AggregateFunction[..., ..., ...] {  

      var valueList: List[LabeledVector] = List[LabeledVector]()    

      override def createAccumulator(): MyAggregator = new MyAggregator()
      override def add(value: .., accumulator: MyAggregator): ... = ...
      override def merge(agg1: MyAggregator, agg2: MyAggregator): ... = ...
      override def getResult(accumulator: MyAggregator): ... = {
        ExecutionEnvironment.getExecutionEnvironment.fromCollection(valueList)
      }
    }
于 2019-08-13T08:48:35.263 回答