TLDR:当我或我直接创建的任务在其上发出 DML 时,我的流会被消耗。但是当它是由存储过程创建的任务时,流不会被消耗。
我有一个行为符合预期的流,当我选择它时我可以看到它有数据:
SELECT SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name');
使用创建它的相同角色,我使用流:
INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
FROM (SELECT * FROM ANALYTICS_DB.schema.stream_name);
我再次选择 SYSTEM$STREAM_HAS_DATA,一切都很好。
现在,我将其捆绑到一个任务中:
CREATE TASK IF NOT EXISTS ANALYTICS_DB.schema.table_test
WAREHOUSE = wh
SCHEDULE = 'USING CRON * * * * * Etc/UTC'
COMMENT = 'Checking when was the last time tables got updated'
WHEN -- conditional check if the stream has new data
SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
AS -- same previous query
INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(MODIFIED_AT)
FROM FROM ANALYTICS_DB.schema.stream_name;
大约一分钟后,我再次检查了我的流,一切都很好,它在按计划运行时会消耗流。
创建任务的存储过程。
我的 SQL 部分:
create PROCEDURE IF NOT EXISTS ANALYTICS_DB.schema.create_tasks()
returns string
language javascript
EXECUTE AS CALLER
as
和 javascript 部分(为了读者,修剪到重要的部分)。它运行良好,创建任务,任务按计划运行,发出查询但不消耗流。因此我的 max() 计算是在一个不断增长的表上完成的。
$$
// trimmed some stuff here getting the data
while (result_set.next())
{
var lagschema = result_set.getColumnValue(1);
var lagtable = result_set.getColumnValue(2);
var lagcolumn = result_set.getColumnValue(3);
var sql_task = `CREATE TASK IF NOT EXISTS schema.ppw_freshness_schema.stream_name
WAREHOUSE = wh
SCHEDULE = 'USING CRON */5 * * * * Etc/UTC'
COMMENT = 'Checking when was the last update'
WHEN
SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
AS
INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
FROM FROM ANALYTICS_DB.schema.stream_name;`;
var create_task = snowflake.createStatement({sqlText: sql_task});
create_task.execute();
var start_task = snowflake.createStatement({sqlText: `ALTER TASK IF EXISTS schema.ppw_freshness_schema.stream_name RESUME;`});
start_task.execute();
}
// error handling
$$;
请参阅下面我通过存储过程创建的任务是如何每次运行的,因为它永远不会清空流。一旦我手动创建相同的任务,它就可以清空流,最后在没有新数据时跳过运行(这是想要的行为)。