我希望使用 Kinesis Data Analytics(或其他一些 AWS 托管服务)根据筛选条件来批处理记录。我们的想法是,当记录进入时,我们将启动一个会话窗口并将任何匹配的记录批处理 15 分钟。
交错窗口正是我们想要的,只是我们不希望聚合数据,而只是将所有记录一起返回。
理想情况下...
100 records spread over 15 min. (20 matching criteria) with first one at 10:02
|
v
At 10:17, the 20 matching records would be sent to the destination
我试过做类似的事情:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
"device_id" INTEGER,
"child_id" INTEGER,
"domain" VARCHAR(32),
"category_id" INTEGER,
"posted_at" DOUBLE,
"block" TIMESTAMP
);
-- Create pump to insert into output
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Select all columns from source stream
SELECT STREAM
"device_id",
"child_id",
"domain",
"category_id",
"posted_at",
FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) as block
FROM "SOURCE_SQL_STREAM_001"
WHERE "category_id" = 888815186
WINDOWED BY STAGGER (
PARTITION BY "child_id", FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE)
RANGE INTERVAL '15' MINUTE);
对于不在聚合中的所有列,我继续收到错误:
From line 6, column 5 to line 6, column 12: Expression 'domain' is not being used in PARTITION BY sub clause of WINDOWED BY clause
Kinesis Firehose 是一个建议的解决方案,但它对所有人来说都是一个盲窗child_id
,因此它可能会将一个会话分成多个,而这正是我想要避免的。
有什么建议么?感觉这可能不是正确的工具。