0

我想用 RxJava 实现一个 EventBus,我需要粘性事件。我知道我可以使用 BehaviorSubject,但它只缓存最后一个发出的项目,而我想缓存所有按其类型(类名)不同的事件。还有另一种选择 - ReplaySubject 但是它有一个开销 - 它包含所有发出的元素。有没有办法创建某种类型的 ReplaySubject 只包含唯一的类型元素?

4

2 回答 2

1

我只用一个流无法成功解决这个问题。
所以我有一个按事件类型的流,并且只公开一个合并的流。

open class Event(val name: String)
class Event1(name: String) : Event(name)
class Event2(name: String) : Event(name)

val event1Subject = BehaviorSubject.create<Event1>()
val event2Subject = BehaviorSubject.create<Event2>()

event1Subject.onNext(Event1("event1 a"))
event1Subject.onNext(Event1("event1 b"))
event2Subject.onNext(Event2("event2 a"))
event2Subject.onNext(Event2("event2 b"))

Observable.merge(event1Subject, event2Subject)
        .subscribe { println(it.name) }

控制台输出:

event1 b
event2 b
于 2020-09-11T09:20:31.683 回答
1

我不相信有任何干净的解决方案。但是,您也许可以完成这项工作。

  1. BehaviorSubject<>为每个事件类型创建一个。使用 aConcurrentMap将每个传入事件分派到正确的Subject.
  2. 按照事件到达的顺序维护这些主题的列表。
  3. 当接收到新订阅时,将创建一个可观察的对象,它是所有主题的合并,第一个主题列表已经接收到事件,然后是那些没有按任何顺序排列的主题。

这是一些可能澄清上述内容的代码。未经测试。

// The subscription operation will perform a merge of the two lists
Map<EventType, BehaviorSubject<Event>> map = new ConcurrentHashMap<>();
List<BehaviorSubject<Event>> listOfUnseenEvents = new ArrayList<>();
List<BehaviorSubject<Event>> listOfSeenEvents = new ArrayList<>();
// ...
listOfUnseenEvents = map.values().asList();

public Observable<Event> busSub() {
  List<BehaviorSubject<Event>> allEvents = new ArrayList<>();
  synchronized ( map ) {
    allEvents.addAll( listOfSeenEvents );
    allEvents.addAll( listOfUnseenEvents );
  }
  return Observable.merge( allEvents );
}

// receive an event and dispatch it
eventSource
  .subscribe( event -> processEvent( event ) );

public void processEvent( Event event ) {
    BehaviorSubject<Event> eSubject = map.get( event.getEventType() );
    synchronized ( map ) {
      if ( containsEventType( listOfSeenEvents, event.getEventType() ) ) {
        removeEventType( listOfSeenEvents, event.getEventType() );
      } else {
        removeEventType( listOfUnseenEvents, event.getEventType() );
      }
      listOfSeenEvents.add( eSubject );
    }
    eSubject.onNext( event );
}

请注意,此代码利用了merge()按顺序订阅每个给定 observables 的事实。在 RxJava 文档中没有这样的保证。

如果事先不知道所有事件类型,那么这将不起作用。

于 2017-09-04T17:51:16.803 回答