我正在对流中的消息进行批处理,希望通过 3 个条件来完成:
- 如果批处理程序无法添加传入消息(对于批处理程序的任何内部逻辑)
- 如果有消息然后在X秒内没有消息(基本上是油门的作用)
- 如果有连续的消息流,然后每X秒更频繁,那么在Y秒后仍然关闭批处理(油门上限)
我需要能够在运行时更改X和Y秒而不会丢失当前批次(无论是在配置更改时立即关闭还是通过关闭条件关闭)。条件函数和批处理函数不应在并行线程中运行。
我正在使用 Rx-Main 2.2.5。到目前为止,我想出了下面的解决方案,它似乎有效,但我认为可能有更简单的反应扩展解决方案?如果关闭条件是“批处理器无法添加此消息”,则使用此解决方案 capTimer 也不会重新启动。
扩大:
public static class ObservableExtensions
{
public static IDisposable ConditionalCappedThrottle<T>(this IObservable<T> observable,
int throttleInSeconds,
int capTimeInSeconds,
Func<T, bool> conditionFunc,
Action capOrThrottleAction,
Action<T, Exception> onException,
T fakeInstance = default(T))
{
Subject<T> buffer = new Subject<T>();
var capTimerObservable = new Subject<long>();
var throttleTimerObservable = observable.Throttle(TimeSpan.FromSeconds(throttleInSeconds)).Select(c => 1L);
IDisposable maxBufferTimer = null;
var bufferTicks = observable
.Do(c =>
{
if (maxBufferTimer == null)
maxBufferTimer = Observable.Timer(TimeSpan.FromSeconds(capTimeInSeconds))
.Subscribe(x => capTimerObservable.OnNext(1));
})
.Buffer(() => Observable.Amb(
capTimerObservable
.Do(c => Console.WriteLine($"{DateTime.Now:mm:ss.fff} cap time tick closing buffer")),
throttleTimerObservable
.Do(c => Console.WriteLine($"{DateTime.Now:mm:ss.fff} throttle time tick closing buffer"))
))
.Do(c =>
{
maxBufferTimer?.Dispose();
maxBufferTimer = null;
})
.Where(changes => changes.Any())
.Subscribe(dataChanges =>
{
buffer.OnNext(fakeInstance);
});
var observableSubscriber = observable.Merge(buffer)
.Subscribe(subject =>
{
try
{
if (!subject.Equals(fakeInstance)) {
if (conditionFunc(subject))
return;
Console.WriteLine($"{DateTime.Now:mm:ss.fff} condition false closing buffer");
maxBufferTimer?.Dispose();
}
capOrThrottleAction();
if (!subject.Equals(fakeInstance))
conditionFunc(subject);
}
catch (Exception ex)
{
onException(subject, ex);
}
});
return new CompositeDisposable(maxBufferTimer, observableSubscriber);
}
}
和用法:
class Program
{
static void Main(string[] args)
{
messagesObs = new Subject<Message>();
new Thread(() =>
{
while (true)
{
Thread.Sleep(random.Next(3) * 1000);
(messagesObs as Subject<Message>).OnNext(new Message());
}
}).Start();
while (true)
{
throttleTime = random.Next(8) + 2;
maxThrottleTime = random.Next(10) + 20;
Console.WriteLine($"{DateTime.Now:mm:ss.fff} resubscribing with {throttleTime} - {maxThrottleTime}");
Subscribe();
Thread.Sleep((random.Next(10) + 60) * 1000);
}
}
static Random random = new Random();
static int throttleTime = 3;
static int maxThrottleTime = 10;
static IDisposable messagesSub;
static IObservable<Message> messagesObs;
static void Subscribe()
{
messagesSub?.Dispose();
BatchProcess();
messagesSub = messagesObs.ConditionalCappedThrottle(
throttleTime,
maxThrottleTime,
TryAddToBatch,
BatchProcess,
(msg, ex) => { },
new FakeMessage());
}
static bool TryAddToBatch(Message msg)
{
if (random.Next(100) > 85)
{
Console.WriteLine($"{DateTime.Now:mm:ss.fff} can't add to batch");
return false;
}
else
{
Console.WriteLine($"{DateTime.Now:mm:ss.fff} added to batch");
return true;
}
}
static void BatchProcess()
{
Console.WriteLine($"{DateTime.Now:mm:ss.fff} Processing");
Thread.Sleep(2000);
Console.WriteLine($"{DateTime.Now:mm:ss.fff} Done Processing");
}
}
public class Message { }
public class FakeMessage : Message { }
我想工作的测试:
public class Test
{
static Subject<Base> sub = new Subject<Base>();
static int maxTime = 19;
static int throttleTime = 6;
// Batcher.Process must be always waited before calling any next Batcher.Add
static void MaxTime()
{
// foreach on next Batcher.Add must be called
sub.OnNext(new A());
Thread.Sleep(6 * 1000 - 100);
sub.OnNext(new A());
Thread.Sleep(6 * 1000 - 100);
sub.OnNext(new A());
Thread.Sleep(6 * 1000 - 100);
// Process must be called after 19 seconds = maxTime
}
static void Throttle()
{
// foreach on next Batcher.Add must be called
sub.OnNext(new A());
Thread.Sleep(6 * 1000 - 100);
sub.OnNext(new A());
Thread.Sleep(6 * 1000 - 100);
// Process must be called after 5.9+5.9+6 seconds = throttleTime
}
static void Condition()
{
// foreach on next Batcher.Add must be called
sub.OnNext(new A());
Thread.Sleep(6 * 1000 - 100);
sub.OnNext(new B());
// Process must be called because Batcher.Add will return false
// Batcher.Add(B) must be called after Process
}
static void MaxTimeOrThorttleNotTickingRandomly()
{
sub.OnNext(new A());
// Process called by throttle condition in 6 seconds
Thread.Sleep(1000 * 100);
// Process is not called for remaining 94 seconds
sub.OnNext(new A());
// Process called by throttle condition in 6 seconds
}
static void Resub()
{
sub.OnNext(new A());
sub.OnNext(new A());
sub.OnNext(new A());
sub.OnNext(new A());
sub.OnNext(new A());
maxTime = 15;
throttleTime = 3;
// Process is called
// Resubs with new timinig conditions
sub.OnNext(new A());
// Process called by throttle condition in 3 seconds
}
}
public class Batcher
{
private Type batchingType;
public bool Add(Base element)
{
if (batchingType == null || element.GetType() == batchingType)
{
batchingType = element.GetType();
return true;
}
return false;
}
public void Process()
{
batchingType = null;
}
}
public class Base{}
public class A : Base { }
public class B : Base { }