0

我正在对流中的消息进行批处理,希望通过 3 个条件来完成:

  1. 如果批处理程序无法添加传入消息(对于批处理程序的任何内部逻辑)
  2. 如果有消息然后在X秒内没有消息(基本上是油门的作用)
  3. 如果有连续的消息流,然后每X秒更频繁,那么在Y秒后仍然关闭批处理(油门上限)

我需要能够在运行时更改XY秒而不会丢失当前批次(无论是在配置更改时立即关闭还是通过关闭条件关闭)。条件函数和批处理函数不应在并行线程中运行。

我正在使用 Rx-Main 2.2.5。到目前为止,我想出了下面的解决方案,它似乎有效,但我认为可能有更简单的反应扩展解决方案?如果关闭条件是“批处理器无法添加此消息”,则使用此解决方案 capTimer 也不会重新启动。

扩大:

public static class ObservableExtensions
{
    public static IDisposable ConditionalCappedThrottle<T>(this IObservable<T> observable,
        int throttleInSeconds,
        int capTimeInSeconds,
        Func<T, bool> conditionFunc,
        Action capOrThrottleAction,
        Action<T, Exception> onException,
        T fakeInstance = default(T))
    {
        Subject<T> buffer = new Subject<T>();
        var capTimerObservable = new Subject<long>();
        var throttleTimerObservable = observable.Throttle(TimeSpan.FromSeconds(throttleInSeconds)).Select(c => 1L);
        IDisposable maxBufferTimer = null;

        var bufferTicks = observable
            .Do(c =>
            {
                if (maxBufferTimer == null)
                    maxBufferTimer = Observable.Timer(TimeSpan.FromSeconds(capTimeInSeconds))
                        .Subscribe(x => capTimerObservable.OnNext(1));
            })
            .Buffer(() => Observable.Amb(
                capTimerObservable
                .Do(c => Console.WriteLine($"{DateTime.Now:mm:ss.fff} cap time tick closing buffer")), 
                throttleTimerObservable
                .Do(c => Console.WriteLine($"{DateTime.Now:mm:ss.fff} throttle time tick closing buffer"))
                ))
            .Do(c =>
            {
                maxBufferTimer?.Dispose();
                maxBufferTimer = null;
            })
            .Where(changes => changes.Any())
            .Subscribe(dataChanges =>
            {
                buffer.OnNext(fakeInstance);
            });

        var observableSubscriber = observable.Merge(buffer)
            .Subscribe(subject =>
            {
                try
                {
                    if (!subject.Equals(fakeInstance)) {
                        if (conditionFunc(subject))
                            return;
                        Console.WriteLine($"{DateTime.Now:mm:ss.fff} condition false closing buffer");
                        maxBufferTimer?.Dispose();
                    }
                    capOrThrottleAction();
                    if (!subject.Equals(fakeInstance))
                        conditionFunc(subject);
                }
                catch (Exception ex)
                {
                    onException(subject, ex);
                }
            });

        return new CompositeDisposable(maxBufferTimer, observableSubscriber);
    }
}

和用法:

class Program
{
    static void Main(string[] args)
    {
        messagesObs = new Subject<Message>();
        new Thread(() =>
        {
            while (true)
            {
                Thread.Sleep(random.Next(3) * 1000);
                (messagesObs as Subject<Message>).OnNext(new Message());
            }
        }).Start();

        while (true)
        {
            throttleTime = random.Next(8) + 2;
            maxThrottleTime = random.Next(10) + 20;
            Console.WriteLine($"{DateTime.Now:mm:ss.fff} resubscribing with {throttleTime} - {maxThrottleTime}");
            Subscribe();
            Thread.Sleep((random.Next(10) + 60) * 1000);
        }
    }

    static Random random = new Random();
    static int throttleTime = 3;
    static int maxThrottleTime = 10;
    static IDisposable messagesSub;
    static IObservable<Message> messagesObs;
    static void Subscribe()
    {
        messagesSub?.Dispose();
        BatchProcess();
        messagesSub = messagesObs.ConditionalCappedThrottle(
            throttleTime,
            maxThrottleTime,
            TryAddToBatch,
            BatchProcess,
            (msg, ex) => { },
            new FakeMessage());
    }
    static bool TryAddToBatch(Message msg)
    {
        if (random.Next(100) > 85)
        {
            Console.WriteLine($"{DateTime.Now:mm:ss.fff} can't add to batch");
            return false;
        }
        else
        {
            Console.WriteLine($"{DateTime.Now:mm:ss.fff} added to batch");
            return true;
        }
    }
    static void BatchProcess()
    {
        Console.WriteLine($"{DateTime.Now:mm:ss.fff} Processing");
        Thread.Sleep(2000);
        Console.WriteLine($"{DateTime.Now:mm:ss.fff} Done Processing");
    }
}
public class Message { }
public class FakeMessage : Message { }

我想工作的测试:

public class Test
{
    static Subject<Base> sub = new Subject<Base>();
    static int maxTime = 19;
    static int throttleTime = 6;

    // Batcher.Process must be always waited before calling any next Batcher.Add
    static void MaxTime()
    {
        // foreach on next Batcher.Add must be called
        sub.OnNext(new A());
        Thread.Sleep(6 * 1000 - 100);
        sub.OnNext(new A());
        Thread.Sleep(6 * 1000 - 100);
        sub.OnNext(new A());
        Thread.Sleep(6 * 1000 - 100);
        // Process must be called after 19 seconds = maxTime
    }
    static void Throttle()
    {
        // foreach on next Batcher.Add must be called
        sub.OnNext(new A());
        Thread.Sleep(6 * 1000 - 100);
        sub.OnNext(new A());
        Thread.Sleep(6 * 1000 - 100);
        // Process must be called after 5.9+5.9+6 seconds = throttleTime
    }
    static void Condition()
    {
        // foreach on next Batcher.Add must be called
        sub.OnNext(new A());
        Thread.Sleep(6 * 1000 - 100);
        sub.OnNext(new B());
        // Process must be called because Batcher.Add will return false
        // Batcher.Add(B) must be called after Process
    }
    static void MaxTimeOrThorttleNotTickingRandomly()
    {
        sub.OnNext(new A());
        // Process called by throttle condition in 6 seconds
        Thread.Sleep(1000 * 100);
        // Process is not called for remaining 94 seconds
        sub.OnNext(new A());
        // Process called by throttle condition in 6 seconds
    }

    static void Resub()
    {
        sub.OnNext(new A());
        sub.OnNext(new A());
        sub.OnNext(new A());
        sub.OnNext(new A());
        sub.OnNext(new A());
        maxTime = 15;
        throttleTime = 3;
        // Process is called
        // Resubs with new timinig conditions
        sub.OnNext(new A());
        // Process called by throttle condition in 3 seconds
    }
}

public class Batcher
{
    private Type batchingType;
    public bool Add(Base element)
    {
        if (batchingType == null || element.GetType() == batchingType)
        {
            batchingType = element.GetType();
            return true;
        }
        return false;
    }
    public void Process()
    {
        batchingType = null;
    }
}

public class Base{}
public class A : Base { }
public class B : Base { }
4

0 回答 0