3

设想:

我有来自传感器的事件流。事件可以是T-typeJ-Type

  • T 类事件有事件发生的时间戳。
  • J 型事件具有开始和结束时间戳。

根据 J-Type 事件的开始和结束时间戳,对时间范围内的所有 T-type 事件应用聚合逻辑并将结果写入 DB。

为此,我创建了一个自定义触发器,它在收到 J-Type 事件时触发。在我的自定义 ProcessWindowFunction 中,我正在执行聚合逻辑和时间检查。

但是,可能存在一种情况,即 T 型事件不在当前 J 型事件的时间范围内。在这种情况下,应该在清除当前窗口之前将 T 型事件推送到下一个窗口。

流窗口

想到的解决方案:

  1. 在自定义窗口处理函数中,将未处理的 T 型事件推送到 Kinesis 流(源)中。(最坏情况解决方案)

  2. 使用 FIRE 代替 FIRE_AND_PURGE,以在整个运行时维护状态。使用元素迭代器删除已处理的元素。(不推荐,保持无限窗口)

想知道,是否有任何方法可以将未处理的事件直接推送回输入流(没有运动)。(重新排队)

或者

有什么方法可以在 keyBy 上下文中维护状态,以便我们对这些未处理的数据(之前或)与窗口元素一起执行计算。

4

1 回答 1

3

这里有两个解决方案。它们的基本行为或多或少是等效的,但您可能会发现其中一个更容易理解、维护或测试。

至于您的问题,不,没有办法循环回(重新排队)未使用的事件而不将它们推回 Kinesis。但是只要坚持到需要它们就可以了。

解决方案 1:使用 RichFlatMapFunction

当 T 型事件到达时,将它们附加到一个ListState对象。当 J 型事件到达时,从列表中收集所有匹配的 T 型事件到输出,并更新列表以仅保留那些将属于以后的 J 型事件的 T 型事件。

解决方案 2:使用带有自定义 Trigger 和 Evictor 的 GlobalWindows

除了您已经完成的工作之外,实现一个Evictor(在窗口被 FIREd 之后)从窗口中仅删除 J 型事件和所有匹配的 T 型事件。

更新:清除陈旧密钥/失效传感器的状态

使用解决方案 1,您可以使用状态 TTL安排清除与死键关联的任何非活动状态。或者您可以使用 aKeyedProcessFunction而不是 a RichFlatMapFunction,并使用计时器来完成同样的事情。

使用窗口 API 管理陈旧密钥的状态可能不那么简单,但对于解决方案 2,我相信您可以扩展自定义触发器以包含将清除窗口的超时。如果您在 中使用了全局状态ProcessWindowFunction,则需要依靠状态 TTL 来清理它。

于 2020-02-21T06:53:45.723 回答