2

我想知道是否可以使用 Sparks Streaming 将滑动窗口级联到另一个。

因此,例如,我每 1 秒就有一次计数。我想总结 5、15 和 30 秒的窗口。我想知道是否可以将 5 秒的窗口结果用于 15 秒的结果,将 15 秒的结果用于 30 秒。

目的是避免为最长窗口的长度存储所有输入的 1 秒更新(因为这里的粒度无关紧要)。相反,我们以与我们需要的频率相匹配的频率重用 Dstream。

这是和示例:

    JavaPairDStream< String, Double >  test = input; 
    JavaPairDStream< String, Double >  test1 = input;
    // 5s:
    test = test.reduceByKeyAndWindow(new SumReducer(), new Duration(5000), new Duration(1000));  
    test1 = test1.reduceByKeyAndWindow(new SumReducer(), new Duration(5000), new Duration(5000));
    // 15s
    test = test1.reduceByKeyAndWindow(new SumReducer(), new Duration(15000), new Duration(5000));
    test1 = test1.reduceByKeyAndWindow(new SumReducer(), new Duration(15000), new Duration(15000));
    // 30s
    test = test1.reduceByKeyAndWindow(new SumReducer(), new Duration(30000), new Duration(15000));
    test.print();

我试过了,但没有打印出来。

4

1 回答 1

2

批处理间隔

窗口长度滑动间隔必须是批处理间隔的乘积。为了避免竞争条件(例如在 10 秒窗口中发出三个 5 秒的总和),批处理间隔必须大于计算时间。我在这里假设批处理间隔为 1000 毫秒。

例子

JavaPairDStream<String, Double> stream = input; 

// A: 5s sum every 5s
stream5sCount = stream.reduceByKeyAndWindow(
    new SumReducer(), new Duration(5000), new Duration(5000));  

// B: 15s sum every 5s
stream15sCount = stream5sCount.reduceByKeyAndWindow(
    new SumReducer(), new Duration(15000), new Duration(5000));

// C: 30s sum every 15s
stream30sCount = stream15sCount
    .reduceByKeyAndWindow(new SumReducer(), new Duration(30000), new Duration(15000))
    .map(new DivideBy(3));

stream30sCount.print();

解释

(对于两个动作 A 和 B,其中 B 减少 A:B 的 windowLength / A 的 slideInterval = B 的输入元组数。)

  1. 每 5 秒 A 总结 5 个元组。
  2. 每 5 秒 B 根据 (3*5=) 15 个原始元组总结 A 的最后 (15/5=) 3 个结果。
  3. 每 30 秒 C 根据 (6*3*5=) 90 个原始元组总结 B 的最后 (30/5=) 6 个结果!元组将被多次求和,因为 B 的窗口间隔大于其滑动间隔。
  4. 映射器更正计算错误。

校正步骤

我假设您的实际应用程序不像字数那么容易。之后您需要一个反函数来修复重复错误。您还可以尝试在 C 之前解决问题(在字数示例中,可以更早地划分)。另一种解决方案是跟踪已处理的元组并仅聚合析取元组。这取决于您的用例。

于 2014-11-25T20:02:27.427 回答