我有一个场景,如果查询的一部分匹配一个事件,我想从数据存储中获取一些其他事件来测试查询的其余部分
例如。“如果 JANE DOE 从我的商店购买,她在过去 3 年里是否还买了其他东西”之类的东西。
Flink、Storm 或 WSO2 是否提供对此类复杂事件处理的支持?
我有一个场景,如果查询的一部分匹配一个事件,我想从数据存储中获取一些其他事件来测试查询的其余部分
例如。“如果 JANE DOE 从我的商店购买,她在过去 3 年里是否还买了其他东西”之类的东西。
Flink、Storm 或 WSO2 是否提供对此类复杂事件处理的支持?
Flink 可以做到这一点,但它要求您从您关心的最早(例如 3 年前)开始处理所有事件,以便您可以为每个客户构建状态。然后,Flink 允许您管理此状态(通常使用 RocksDB),这样您就不必在面对系统故障时重播所有事件。
如果您无法重放所有历史记录,那么通常您会将其放入具有您需要的可扩展性和性能特征的其他存储(Cassandra/HBase、Elasticsearch 等),然后使用 Flink 的异步函数支持来查询它当您收到新事件时。
WSO2 流处理器让您使用它的时间增量分析功能实现此类功能。要实现您提到的场景,您可以将客户到达时触发的事件提供给称为“聚合”的构造。当您继续向聚合提供事件时,它将随着时间的推移汇总数据,并将保存在配置的持久性存储中,例如 DB。
您可以查询此聚合以获取给定时间段的状态。例如,以下查询将获取 2014-2015 年的名称、购买的商品总数和平均交易价值
from CustomerSummaryRetrievalStream as b join CustoemrAggregation as a
on a.name == b.name
within "2014-01-01 00:00:00 +05:30", "2015-01-01 00:00:00 +05:30"
per “years”
select a.name, a.total, a.avgTxValue
insert into CustomerSummaryStream;