0

我对多线程有点困惑。我目前正在使用 SinglaR 开发实时服务。这个想法是连接的用户可以向另一个用户请求数据。下面是请求和响应函数的概要。

考虑以下代码:

private readonly ConcurrentBag _sharedObejcts= new ConcurrentBag();

请求:

[...]

var sharedObject = new MyObject();
_sharedObejcts.Add(sharedObject);

ForwardRequestFireAndForget();

try
{
    await Task.Delay(30000, sharedObject.myCancellationToken);
}
catch
{
    return sharedObject.ResponseProperty;
}

_myConcurrentBag.TryTake(sharedObject);

[...]

响应:

[...]

var result = DoSomePossiblyVeryLengthyTaskHere();

var sharedObject = ConcurrentBag 
    .Where(x)
    .FirstOrDefault();

// The request has timed out so the object isn't there anymore.
if(sharedObject == null)
{
    return someResponse;
}

sharedObject.ResponseProperty = result;

// triggers the cancellation source
sharedObject.Cancel();

return someOtherResponse;

[...]

所以基本上向服务器发出请求,转发到另一台主机,该功能等待取消或超时。

其他主机调用响应函数,该函数添加repsonseObject和 触发器myCancellationToken

但是,我不确定这是否代表竞争条件。理论上,响应线程可以检索到sharedObject而另一个线程仍然位于 finally 块上吗?这意味着,请求已经超时,任务还没有从包中取出对象,这意味着数据不一致。

有哪些有保证的方法可以确保在通话后首先被Task.Delay()调用的是TryTake()通话?

4

1 回答 1

1

您不想让生产者取消消费者的等待。太多的责任混为一谈了。

相反,您真正想要的是生产者发送异步信号。这是通过TaskCompletionSource<T>. 消费者可以使用不完整的 TCS 添加对象,然后消费者可以(异步)等待该 TCS 完成(或超时)。然后生产者只是将其价值提供给 TCS。

像这样的东西:

class MyObject
{
  public TaskCompletionSource<MyProperty> ResponseProperty { get; } = new TaskCompletionSource<MyProperty>();
}


// request (consumer):

var sharedObject = new MyObject();
_sharedObejcts.Add(sharedObject);

ForwardRequestFireAndForget();

var responseTask = sharedObject.ResponseProperty.Task;
if (await Task.WhenAny(Task.Delay(30000), responseTask) != responseTask)
  return null;

_myConcurrentBag.TryTake(sharedObject);
return await responseTask;


// response (producer):

var result = DoSomePossiblyVeryLengthyTaskHere();
var sharedObject = ConcurrentBag 
    .Where(x)
    .FirstOrDefault();

// The request has timed out so the object isn't there anymore.
if(sharedObject == null)
  return someResponse;

sharedObject.ResponseProperty.TrySetResult(result);
return someOtherResponse;

上面的代码可以稍微清理一下;具体来说,让生产者拥有共享对象的“生产者视图”,而消费者拥有“消费者视图”,这两个接口都由相同的类型实现,这并不是一个坏主意。但是上面的代码应该给你一个大致的想法。

于 2020-12-23T00:53:49.363 回答