编辑:
据我所知,您的解决方案是正确的。如果您想稍微提高性能,请考虑:
val observableOut: Observable[Out] = observableIn.flatMap { in =>
val p = Promise.successful(in)
Observable.from(futureInToFutureOut(p.future))
}
这有点快,因为它不会创建异步计算来映射Future.apply未来。
老的:
我将在下面留下我的旧建议,该建议仅在您将单个事件映射到Observable.
import scala.concurrent._
val promiseIn = Promise[Int]()
observableIn.foreach(x => promiseIn.trySuccess(x))
val observableOut = Observable.create { observer =>
promiseIn.future.map(futureInToFutureOut).foreach { y =>
observer.onNext(y)
observer.onCompleted()
}
}
解释
由于您是从一个Observable[In]对象(即事件流)开始的,因此您需要找到一种将事件从该对象转移Observable到未来的方法。创建新未来的典型方法是创建它的Promise第一个 -未来对象的输入端。然后,当第一个事件到达时,您可以使用foreachonObservable来调用未来:trySuccess
observableIn ---x---> foreach ---x---> promiseIn.trySuccess(x)
一旦事件Observable到达,promise 将异步完成。future我们现在可以通过调用它的方法来获取promise的读取端,即future ;然后呼唤map未来—— promiseIn.future.map(futureInToFutureOut)。图形化:
promiseIn.future ---x---> map ---f(x)---> futureOut
由此产生的未来是异步完成的futureInToFutureOut(x)。在这一点上,我们需要找到一种方法将这个值通过Observable[Out]. 创建新的典型方法Observable是调用Observable.create工厂方法。这个方法作为Observable' 的写作结束 - Observer,我们使用它来通过调用来发出事件onNext:
futureOut ---f(x)---> foreach ---f(x)---> observer.onNext(f(x))
由于我们知道未来最多发出一个事件,因此我们调用onCompleted观察者的 on 来关闭输出Observable。
编辑:如果您想掌握 Rx 和 Scala 期货,您可能需要考虑这本书,它涉及这些主题。免责声明:我是作者。