3

我已经实现了一个 EventGrid 触发器来响应 Blob 存储事件,其逻辑简化如下:

public static async void Run(
    JObject eventGridEvent,
    TraceWriter log,
    ExecutionContext context)
{
    string eventContent = ParseEvent(eventGridEvent);

    HttpClient client = GetProxyClient();
    HttpResponseMessage response = await client.GetAsync("blabla/" + eventContent);
    string responseContent = await response.Content.ReadAsStringAsync();
    log.Info("Here is the response :" + responseContent);
}

外部 API 响应时间不长(1 秒或更短),并且我的主机配置设置为默认值(因此允许无限数量的并发调用)。

同时添加多个 blob(从 2 个 blob 开始)时,我在日志中收到很多重复的事件(脚本正在快速逐个上传 blob,中间没有等待时间)。

我觉得这可能是因为我从不承认收到事件,而且我不知道我是否应该在我的代码中执行此操作,或者 EventGrid 触发器是否自动执行此操作。

确认事件处理的逻辑应该在 EventGrid 触发器(Http 200 响应)中实现还是自动处理?

如果不是,我还应该收到重复的事件吗?通常,在上传单个 blob 时,我会收到 3-4 次事件。

我问这个问题的原因是,当使用 Http Trigger 并返回 400 响应时,我也会得到重复的事件,这是有道理的,因为我不承认已经正确处理了事件。但是,当我返回 200 响应时,我不会收到重复的事件。

谢谢

4

2 回答 2

5

您无需执行任何特殊操作即可向 Event Grid 指示成功。如果您的函数执行成功(不抛出异常),触发器将自动响应成功状态码。

于 2018-05-16T11:55:23.690 回答
1

您可以尝试使用Advanced Filter. data.api String ends with FlushWithClose我的 Azure 函数在 blob 上传时执行多次的原因是因为AppendFile为 blob 上传执行的每个操作都创建了一条 EventGrid 消息。

我发现(通过反复试验)Azure 数据工厂使用一系列 API 调用将单个 Blob 写入 Blob 存储。

最终看起来像这样:

  • CreateFilePath
  • LeaseFile
  • AppendFile
  • AppendFile
  • AppendFile(每个追加都会放置一块 blob,直到 blob 完成)
  • FlushFile(这是文件已完成的实际指示;因此上面显示了高级过滤器)
  • LeaseFile

下面是一个示例查询,您可以自己查看此上传流程:

  • 注意:您需要Uri上传到 blob 容器的示例文件
//==================================================//
// Author: Eric
// Created: 2021-05-26 0900 
// Query: ADF-to-Blob Storage reference flow
// Purpose: 
// To provide a reference flow of ADF-to-Blob Storage
// file uploads
//==================================================//
// Assign variables
//==================================================//
let varStart = ago(10d);
let varEnd = now();
let varStorageAccount = '<storageaccountname>';
let varStatus = 'Success';
let varSampleUri = 'https://<storageaccountname>.dfs.core.windows.net/<containername>/<parentfolder1>%2F<parentfolder2>%2F<samplefilename.extension>'
//==================================================//
// Filter table
//==================================================//
StorageBlobLogs
| where TimeGenerated between (varStart .. varEnd)
  and AccountName == varStorageAccount
  and StatusText == varStatus
  and split(Uri, '?')[0] == varSampleUri
//==================================================//
// Group and parse results
//==================================================//
| summarize 
  count() by OperationName,
  CorrelationId,
  TimeGenerated,
  UserAgent = tostring(split(UserAgentHeader, ' ')[0]),
  RequesterAppId,
  AccountName, 
  ContainerName = tostring(split(tostring(parse_url(url_decode(Uri))['Path']), '/')[1]),
  FileName = tostring(split(tostring(parse_url(url_decode(Uri))['Path']), '/')[-1]),
  ChunkSize = format_bytes(RequestBodySize, 2, 'MB'),
  StatusCode,
  StatusText
| order by TimeGenerated asc

从不同来源(Azure 数据工厂、Azure 存储资源管理器、Python/C# SDK、Azure 门户等)上传示例并查看它们使用的不同 API 方法很有趣。事实上,您可能要这样做才能拨入您的日志记录和警报。

太糟糕了,这些方法没有跨工具标准化,因为这个特定的问题是你自己发现的一个巨大的痛苦!

同样,在这种情况下,EventGridAdvanced Filters是您的朋友。

于 2021-05-26T19:10:29.507 回答