0

我希望使用 Kinesis Data Analytics(或其他一些 AWS 托管服务)根据筛选条件来批处理记录。我们的想法是,当记录进入时,我们将启动一个会话窗口并将任何匹配的记录批处理 15 分钟。

交错窗口正是我们想要的,只是我们不希望聚合数据,而只是将所有记录一起返回。

理想情况下...

100 records spread over 15 min. (20 matching criteria) with first one at 10:02
                                 |
                                 v
At 10:17, the 20 matching records would be sent to the destination

我试过做类似的事情:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    "device_id" INTEGER, 
    "child_id" INTEGER, 
    "domain" VARCHAR(32),
    "category_id" INTEGER,
    "posted_at" DOUBLE,
    "block" TIMESTAMP
);

-- Create pump to insert into output 
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"

-- Select all columns from source stream
SELECT STREAM 
    "device_id", 
    "child_id", 
    "domain", 
    "category_id", 
    "posted_at",
    FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) as block
FROM "SOURCE_SQL_STREAM_001"
WHERE "category_id" = 888815186
WINDOWED BY STAGGER (
    PARTITION BY "child_id", FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) 
    RANGE INTERVAL '15' MINUTE);

对于不在聚合中的所有列,我继续收到错误:

From line 6, column 5 to line 6, column 12: Expression 'domain' is not being used in PARTITION BY sub clause of WINDOWED BY clause

Kinesis Firehose 是一个建议的解决方案,但它对所有人来说都是一个盲窗child_id,因此它可能会将一个会话分成多个,而这正是我想要避免的。

有什么建议么?感觉这可能不是正确的工具。

4

1 回答 1

0

LAST_VALUE("domain") as domain在 select 子句中尝试。

于 2020-04-08T19:52:32.530 回答