0

我有一个 Azure 流分析作业,它接收一些原始事件,转换它们,然后将它们写入一些不同的输出。我收到以下错误:

超出最大事件中心接收器。每个分区只允许 5 个接收器。请为此输入使用专门的消费者组。如果有多个查询使用相同的输入,请使用 WITH 子句共享您的输入。

这很奇怪,因为我在开始时使用公用表表达式(WITH 子句)来获取所有数据,然后我不再访问事件中心。这是查询:

WITH 
ODSMeasurements AS (
    SELECT 
        collectionTimestamp,
        analogValues,
        digitalValues,
        type,
        translationTable
    FROM EventhubODSMeasurements
), 
-- Combine analog and digital measurements
CombineAnalogAndDigital AS (
    SELECT 
        CAST(CONCAT(SUBSTRING(ODS.collectionTimestamp, 1, 10), ' ', SUBSTRING(ODS.collectionTimestamp, 12, 12)) AS datetime) AS "TimeStamp",
        ROUND(AV.PropertyValue.value / (CAST(TT.ConversionFactor AS float)), 5) AS "ValueNumber",
        NULL AS "ValueBit",
        CAST(TT.MeasurementTypeId AS bigint) AS "MeasurementTypeId",
        TT.MeasurementTypeName AS "MeasurementName",
        TT.PartName AS "PartName",
        CAST(TT.ElementId AS bigint) AS "ElementId",
        TT.ElementName AS "ElementName",
        TT.ObjectName AS "ObjectName",
        TT.LocationName AS "LocationName",
        CAST(TT.TranslationTableId AS bigint) AS "TranslationTableId",
        ODS.Type AS "Status"
    FROM ODSMeasurements ODS
    CROSS APPLY GetRecordProperties(analogValues) AS AV
    INNER JOIN SQLTranslationTable TT
    ON 
        TT.Tag = AV.PropertyName AND
        CAST(TT.Version as bigint) = ODS.translationTable.version AND
        TT.Name = ODS.translationTable.name

    UNION

    SELECT  
        CAST(CONCAT(SUBSTRING(ODS.collectionTimestamp, 1, 10), ' ', SUBSTRING(ODS.collectionTimestamp, 12, 12)) AS datetime) AS "TimeStamp",
        CAST(-9999.00000 AS float) AS "ValueNumber",
        CAST(DV.PropertyValue.value AS nvarchar(max)) AS "ValueBit",
        CAST(TT.MeasurementTypeId AS bigint) AS "MeasurementTypeId",
        TT.MeasurementTypeName AS "MeasurementName",
        TT.PartName AS "PartName",
        CAST(TT.ElementId AS bigint) AS "ElementId",
        TT.ElementName AS "ElementName",
        TT.ObjectName AS "ObjectName",
        TT.LocationName AS "LocationName",
        CAST(TT.TranslationTableId AS bigint) AS "TranslationTableId",
        ODS.Type AS "Status"
    FROM ODSMeasurements ODS
    CROSS APPLY GetRecordProperties(digitalValues) AS DV
    INNER JOIN SQLTranslationTable TT
    ON 
        TT.Tag = DV.PropertyName AND
        CAST(TT.Version as bigint) = ODS.translationTable.version AND
        TT.Name = ODS.translationTable.name
)

-- Output data
SELECT *
INTO DatalakeHarmonizedMeasurements
FROM CombineAnalogAndDigital
PARTITION BY TranslationTableId

SELECT *
INTO FunctionsHarmonizedMeasurements
FROM CombineAnalogAndDigital

SELECT Timestamp, ValueNumber, CAST(ValueBit AS bit) AS ValueBit, ElementId, MeasurementTypeId, CAST(TranslationTableId AS bigint) AS TranslationTableId
INTO SQLRealtimeMeasurements
FROM CombineAnalogAndDigital

SELECT *
INTO EventHubHarmonizedMeasurements
FROM CombineAnalogAndDigital
PARTITION BY TranslationTableId

这是我使用的事件中心输入:

{
    "Name": "EventhubODSMeasurements",
    "Type": "Data Stream",
    "DataSourceType": "Event Hub",
    "EventHubProperties": {
        "ServiceBusNamespace": "xxx",
        "EventHubName": "xxx",
        "SharedAccessPolicyName": "xxx",
        "SharedAccessPolicyKey": null,
        "ConsumerGroupName": "streamanalytics",
        "AuthenticationMode": "ConnectionString"
    },
    "DataSourceCredentialDomain": "xxx",
    "Serialization": {
        "Type": "Json",
        "Encoding": "UTF8"
    },
    "PartitionKey": null,
    "CompressionType": "None",
    "ScriptType": "Input"
}

我也为此使用了一个单独的消费者组。在我看来,我做的一切都是对的。有谁知道怎么回事?

编辑:我启用了诊断日志,它说:

超过了消费者组中每个分区允许的最大接收器数量,即 5。连接的接收器列表 - nil、nil、nil、nil、nil。

4

1 回答 1

1

原来问题是PEBKAC。还有另一个作业意外指向了相同的输入事件中心。

于 2021-09-14T23:54:27.070 回答