1

我正在编写一个托管在 Service Fabric 中的有状态服务。该服务的工作是使用来自外部队列的消息,转换它们并将它们放置到我们自己的消息传递系统中。根据供应商文档,吞吐量可以达到 6k 消息/秒。

我已将服务配置为多个分区以分散消息负载,每个分区都有最少 2/最多 3 个副本。为了从故障中恢复,我可以订阅供应商队列并传入我希望接收消息的时间戳。为此,我将存储在服务状态下处理的最后一条消息的时间戳。由于消息量很大,我决定在计时器上“保存”(并允许下游消息的潜在重复)

这是时间调用的代码:

private async void _timer_Elapsed(object sender, ElapsedEventArgs e)
    {
        var saveRetryPolicy = Policy
            .Handle<Exception>()
            .WaitAndRetryAsync(5, retryAttempt =>
                TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))
            );

        await saveRetryPolicy.ExecuteAsync(async () =>
        {
            using (var tx = _stateManager.CreateTransaction())
            {
                var state = await _stateManager.TryGetAsync<IReliableDictionary<string, long>>(TimestampStateName);

                if (state.HasValue)
                {
                    await state.Value.AddOrUpdateAsync(tx, TimestampStateName, _lastTXTimestamp,
                        (s, l) => _lastTXTimestamp);

                    await tx.CommitAsync();
                }
                else
                {
                    var s =
                        await _stateManager.GetOrAddAsync<IReliableDictionary<string, long>>(tx, TimestampStateName);

                    await tx.CommitAsync();
                    _timer_Elapsed(this, null);
                }
            }
        });
    }

每次尝试持久化时,我都会在每个分区上收到“System.Fabric.FabricNotPrimaryException”错误。

我已经包含了重试策略(由 Polly Retry 提供),因为在类似的问题上有评论建议这样做。这没有任何效果,只是延长了报告错误之前的时间。

我是否误解了应该如何使用 SF 的一些基本内容?这对我来说似乎是一个简单的用例。

4

1 回答 1

2

评论回复:

确保不要在所有副本上启动计时器,而仅在主副本上启动。

于 2016-11-22T12:56:59.840 回答