0

TLDR:当我或我直接创建的任务在其上发出 DML 时,我的流会被消耗。但是当它是由存储过程创建的任务时,流不会被消耗。

我有一个行为符合预期的流,当我选择它时我可以看到它有数据: SELECT SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name');

使用创建它的相同角色,我使用流:

INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
FROM (SELECT * FROM ANALYTICS_DB.schema.stream_name);

我再次选择 SYSTEM$STREAM_HAS_DATA,一切都很好。

现在,我将其捆绑到一个任务中:

CREATE TASK IF NOT EXISTS ANALYTICS_DB.schema.table_test
            WAREHOUSE = wh
            SCHEDULE = 'USING CRON * * * * * Etc/UTC'
            COMMENT = 'Checking when was the last time tables got updated'
        WHEN -- conditional check if the stream has new data
            SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
        AS -- same previous query
    INSERT INTO ANALYTICS_DB.schema.table
    (stuff, last_checked, column_max)
    SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(MODIFIED_AT)
    FROM FROM ANALYTICS_DB.schema.stream_name;

大约一分钟后,我再次检查了我的流,一切都很好,它在按计划运行时会消耗流。

创建任务的存储过程。

我的 SQL 部分:

create PROCEDURE IF NOT EXISTS ANALYTICS_DB.schema.create_tasks()
    returns string
    language javascript
    EXECUTE AS CALLER
as

和 javascript 部分(为了读者,修剪到重要的部分)。它运行良好,创建任务,任务按计划运行,发出查询但不消耗流。因此我的 max() 计算是在一个不断增长的表上完成的。

$$
// trimmed some stuff here getting the data

    while (result_set.next())
    {
        var lagschema = result_set.getColumnValue(1);
        var lagtable = result_set.getColumnValue(2);
        var lagcolumn = result_set.getColumnValue(3);

        var sql_task = `CREATE TASK IF NOT EXISTS schema.ppw_freshness_schema.stream_name
            WAREHOUSE = wh
            SCHEDULE = 'USING CRON */5 * * * * Etc/UTC'
            COMMENT = 'Checking when was the last update'
        WHEN 
            SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
        AS
                INSERT INTO ANALYTICS_DB.schema.table
    (stuff, last_checked, column_max)
    SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
    FROM FROM ANALYTICS_DB.schema.stream_name;`;

        var create_task = snowflake.createStatement({sqlText: sql_task});
        create_task.execute();
        var start_task = snowflake.createStatement({sqlText: `ALTER TASK IF EXISTS schema.ppw_freshness_schema.stream_name RESUME;`});
        start_task.execute();
    }
// error handling
    $$;

请参阅下面我通过存储过程创建的任务是如何每次运行的,因为它永远不会清空流。一旦我手动创建相同的任务,它就可以清空流,最后在没有新数据时跳过运行(这是想要的行为)。

任务运行

4

1 回答 1

0

没有什么可以提供关于这里问题的线索,因为它在于流本身的命名。所以完全是我的错误。最重要的是,我正在运行的测试使用了一个非常活跃的表,因此它掩盖了流实际上按预期执行的事实。

于 2020-07-22T13:00:29.647 回答