我想用 RxJava 实现一个 EventBus,我需要粘性事件。我知道我可以使用 BehaviorSubject,但它只缓存最后一个发出的项目,而我想缓存所有按其类型(类名)不同的事件。还有另一种选择 - ReplaySubject 但是它有一个开销 - 它包含所有发出的元素。有没有办法创建某种类型的 ReplaySubject 只包含唯一的类型元素?
2 回答
1
我只用一个流无法成功解决这个问题。
所以我有一个按事件类型的流,并且只公开一个合并的流。
open class Event(val name: String)
class Event1(name: String) : Event(name)
class Event2(name: String) : Event(name)
val event1Subject = BehaviorSubject.create<Event1>()
val event2Subject = BehaviorSubject.create<Event2>()
event1Subject.onNext(Event1("event1 a"))
event1Subject.onNext(Event1("event1 b"))
event2Subject.onNext(Event2("event2 a"))
event2Subject.onNext(Event2("event2 b"))
Observable.merge(event1Subject, event2Subject)
.subscribe { println(it.name) }
控制台输出:
event1 b
event2 b
于 2020-09-11T09:20:31.683 回答
1
我不相信有任何干净的解决方案。但是,您也许可以完成这项工作。
BehaviorSubject<>
为每个事件类型创建一个。使用 aConcurrentMap
将每个传入事件分派到正确的Subject
.- 按照事件到达的顺序维护这些主题的列表。
- 当接收到新订阅时,将创建一个可观察的对象,它是所有主题的合并,第一个主题列表已经接收到事件,然后是那些没有按任何顺序排列的主题。
这是一些可能澄清上述内容的代码。未经测试。
// The subscription operation will perform a merge of the two lists
Map<EventType, BehaviorSubject<Event>> map = new ConcurrentHashMap<>();
List<BehaviorSubject<Event>> listOfUnseenEvents = new ArrayList<>();
List<BehaviorSubject<Event>> listOfSeenEvents = new ArrayList<>();
// ...
listOfUnseenEvents = map.values().asList();
public Observable<Event> busSub() {
List<BehaviorSubject<Event>> allEvents = new ArrayList<>();
synchronized ( map ) {
allEvents.addAll( listOfSeenEvents );
allEvents.addAll( listOfUnseenEvents );
}
return Observable.merge( allEvents );
}
// receive an event and dispatch it
eventSource
.subscribe( event -> processEvent( event ) );
public void processEvent( Event event ) {
BehaviorSubject<Event> eSubject = map.get( event.getEventType() );
synchronized ( map ) {
if ( containsEventType( listOfSeenEvents, event.getEventType() ) ) {
removeEventType( listOfSeenEvents, event.getEventType() );
} else {
removeEventType( listOfUnseenEvents, event.getEventType() );
}
listOfSeenEvents.add( eSubject );
}
eSubject.onNext( event );
}
请注意,此代码利用了merge()
按顺序订阅每个给定 observables 的事实。在 RxJava 文档中没有这样的保证。
如果事先不知道所有事件类型,那么这将不起作用。
于 2017-09-04T17:51:16.803 回答