我正在使用带有 TPL 数据流的 .net 响应式扩展。这是我的管道:
我从某个外部源获取数据点作为流,然后使用数据流 TransformBlocks 转换数据点。在此之后,我使用 Rx 缓冲区缓冲流式点 1 秒,最后我使用数据流 Actionblock 将这些缓冲的数据点发布到 REST 端点。
我想对瞬态错误重试 REST post 操作。我应该在哪里重试:
- 缓冲后?
- 内部动作块?
- 重试时连续流式传输会发生什么?我不想错过任何数据。
我正在使用带有 TPL 数据流的 .net 响应式扩展。这是我的管道:
我从某个外部源获取数据点作为流,然后使用数据流 TransformBlocks 转换数据点。在此之后,我使用 Rx 缓冲区缓冲流式点 1 秒,最后我使用数据流 Actionblock 将这些缓冲的数据点发布到 REST 端点。
我想对瞬态错误重试 REST post 操作。我应该在哪里重试:
只是脱离提供的高级概述,我认为在决赛中重试是最容易的ActionBlock
。您ActionBlock
会在结束前发布并确认成功。根据数据量,这种方法可以让您根据需要处理尽可能多ActionBlock
的 s,而不必过多担心丢弃的项目,实际上不应该有任何问题。如果单个或多个ActionBlock
实例的发布失败,您的项目仍将根据您设置缓冲区和输入队列的方式进行流式传输和分发,以等待他们在线上的机会。
编辑
只是一些伪代码,但这将需要一批数据点,即 IEnumerable,并尝试将它们发布五次。有限容量将导致处理程序的每个实例排队 1000 个批次,并行性将在操作块之间分配批次。可选地,Buffer
可以在 之前添加一个无界的ActionBlock
来保存所有传入的批次。您需要注意您的生产者(流)不会大大超出您的消费者 REST 服务的运行速度。
public void ConfigureFinalActionBlock() {
var dataPointBuffer = new BufferBlock<IEnumerable<Datapoint>>(new DataflowBlockOptions() {
BoundedCapacity = DataflowBlockOptions.Unbounded
});
var options = new ExecutionDataflowBlockOptions() {
BoundedCapacity = 1000,
MaxDegreeOfParallelism = Environment.ProcessorCount
};
var restBlock = new ActionBlock<IEnumerable<Datapoint>>(async (data) => {
var success = false;
var attempts = 0;
while (!success && attempts < 5) {
attempts++;
success = await MyApiPostAsync(data);
}
}, options);
dataPointBuffer.LinkTo(restBlock, new DataflowLinkOptions() {
PropagateCompletion = true
});