1

我有以下流分析输入:

{ "ID":"DEV-001-Test",
  "TMSMUTC":"2021-10-14T14:00:00.000",
  "MSGTYP":"TELEMETRY",
  "THING":[
           {
            "TMSDUTC":"2021-10-14T13:00:00.000",
            "DATA":[
                {
                  "TAGID":"TAGB",
                  "VALUE":30
                },
                {
                  "TAGID":"TAGX",
                  "VALUE":[30.34,245.65,30.34,245.65,245.65,30.34]
                }
               ]
           }
          ]
}

其中“TAGX”的值数组表示从时间戳“TMSDUTC”:“2021-10-14T13:00:00.000”开始,每 10 分钟从传感器记录一小时的值。我想知道如何进行一个可以给我类似输出的查询:

输出

我的主要疑问是如何从时间戳创建 10 分钟的序列并将值交叉应用到它。

4

1 回答 1

0

这是一个很好的!请注意,我强烈建议您在处理这些查询时使用 VSCode 和ASA 扩展。由于本地测试,开发人员的体验比门户网站要好得多,您还可以通过 npm 包对查询进行单元测试。

我做了以下假设:

  • THING 是单个记录的数组。如果不是这样,请告诉我
  • [已编辑] TMSDUTC 在适用时需要根据数组中每个项目的位置增加 10 分钟(TAGX)

有了这个,这里是查询。它被分成多个代码块来解释流程,但我也将它整个粘贴在最后一个代码块中。

首先,我们将所有必填字段带到第一级。它使事情更容易阅读,但不仅如此。GetArrayElement需要一个数组来交叉应用,但 GetArrayElement(单数)在编译时不返回类型。使用中间查询步骤可以解决这个问题。

WITH things AS (
    SELECT
        ID,
        GetArrayElement(THING,0).TMSDUTC AS TMSDUTC,
        MSGTYP AS MessageType,
        GetArrayElement(THING,0).DATA AS DATA
    FROM [input]
),

然后我们展开DATA

dataAll AS (
    SELECT
        T.ID,
        T.TMSDUTC,
        T.MessageType,
        D.ArrayValue.TAGID AS Tag,
        D.ArrayValue.Value AS [Value]
    FROM things T
    CROSS APPLY GetArrayElements(T.DATA) AS D
),

VALUE然后我们为具有类型数组(在您的示例中为 TAGX)的记录创建一个子集。在这里,我通过在运行时检测类型来避免对每个标签进行硬编码。这些记录将需要在接下来的步骤中进行另一轮数组处理。

dataArrays AS (
    SELECT
        A.ID,
        A.TMSDUTC,
        A.MessageType,
        A.Tag,
        A.[Value]
    FROM dataAll A
    WHERE GetType(A.[Value]) = 'array'
),

现在我们可以专注于扩展VALUE这些记录。请注意,我们不能一次性完成(过滤上面的数组和下面CROSS APPLY),因为 GetArrayElement在过滤完成之前会检查类型。

[编辑]为了增加TMSDUTC,我们在其数组中每个项目的索引上使用 DATEADD(ArrayIndex/ArrayValue 都是从数组扩展中返回的,请参见下面的文档)。

dataArraysExpanded AS (
    SELECT
        A.ID,
        DATEADD(minute,10*V.ArrayIndex,A.TMSDUTC) AS TMSDUTC,
        A.MessageType,
        A.Tag,
        V.ArrayValue AS [Value]
    FROM dataArrays A
    CROSS APPLY GetArrayElements(A.[Value]) AS V
),

我们将所有内容结合在一起:

newSchema AS (
    SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataAll WHERE GetType([Value]) != 'array'
        UNION
    SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataArraysExpanded
)

最后将所有内容插入目的地:

SELECT
    *
INTO myOutput
FROM newSchema

[已编辑]请注意,唯一保证结果集的顺序是时间戳定义的顺序。如果多个记录出现在同一个时间戳上,默认情况下不保证顺序。在这里,在查询结束时,所有新创建的事件仍然在原始事件的时间戳上加上时间戳。如果您现在需要在新生成的 TMSDUTC 上应用时间逻辑,则需要将这些记录输出到事件中心,并使用 TIMESTAMP BY TMSDUTC 将它们加载到另一个作业中。目前时间戳只能在查询的第一步直接更改。

这里使用什么:

  • GetArrayElement(单数):文档
  • WITH又名公用表表达式(CTE):doc
  • CROSS APPLY + GetArrayElements : docdoc,加上非常好的参考
  • 获取类型:文档

整个事情更容易复制/粘贴:

WITH things AS (
    SELECT
        ID,
        GetArrayElement(THING,0).TMSDUTC AS TMSDUTC,
        MSGTYP AS MessageType,
        GetArrayElement(THING,0).DATA AS DATA
    FROM [input]
),
dataAll AS (
    SELECT
        T.ID,
        T.TMSDUTC,
        T.MessageType,
        D.ArrayValue.TAGID AS Tag,
        D.ArrayValue.Value AS [Value]
    FROM things T
    CROSS APPLY GetArrayElements(T.DATA) AS D
),
dataArrays AS (
    SELECT
        A.ID,
        A.TMSDUTC,
        A.MessageType,
        A.Tag,
        A.[Value]
    FROM dataAll A
    WHERE GetType(A.[Value]) = 'array'
),
dataArraysExpanded AS (
    SELECT
        A.ID,
        DATEADD(minute,10*V.ArrayIndex,A.TMSDUTC) AS TMSDUTC,
        A.MessageType,
        A.Tag,
        V.ArrayValue AS [Value]
    FROM dataArrays A
    CROSS APPLY GetArrayElements(A.[Value]) AS V
),
newSchema AS (
    SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataAll WHERE GetType([Value]) != 'array'
        UNION
    SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataArraysExpanded
)
SELECT
    *
INTO myOutput
FROM newSchema
于 2021-10-15T00:38:13.893 回答