问题标签 [rx-java]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1174 浏览

scala - 不要在错误时停止 Observable

这是我目前拥有的 Scala 代码:

val b = Observable.interval(1 second).map(n => if (n % 2 == 1) throw new Exception else n*n)

b.subscribe(n => println(n), e => println("error"), () => println("done"))

这是我的输出:

0
错误

如何修改我的 Observable 以便在每个错误之后继续运行,并且我将得到如下输出:

0 投票
1 回答
630 浏览

reactive-programming - RxJava:你能给我一个真实的场景来使用 flatMap 而不是 map

我无法真正理解何时使用 flatmap 而不是 map,也找不到一个很好的例子。
你能想出一个好的场景来选择平面地图而不是地图吗?
谢谢。

0 投票
2 回答
495 浏览

scala - 如何为“observeOn”方法创建一个“调度程序”?

我在我的 Scala 项目中使用 RxJava,我需要Observable在一个单独的线程中执行我的。我知道为了实现这一点,我需要observeOn在它上面调用方法并传递一个实例rx.lang.scala.Scheduler作为参数。

但是我怎样才能创建那个实例呢?我没有找到任何明显的rx.lang.scala.Scheduler特征实例化方法。例如,我有这个代码:

Observable.from(List(1,2,3)).observeOn(scheduler)

有人可以提供一个可以解决问题的工作变量示例scheduler吗?

0 投票
1 回答
847 浏览

multithreading - 如何在单独的线程中运行“Observable”?

我在 Scala 项目中使用 RxJava 并说我有这个简单的Observable

我永远不会收到“你好”消息,因为while阻塞了一个线程。如何Observable在单独的线程中运行以避免阻塞?

====================================

我在想observeOn可能会有所帮助,但事实并非如此。运行这个:

...仍然不打印“你好”。我猜想添加observeOnmakeOnNext在单独的线程中调用而不是while块本身?

====================================

我当然可以换成while一个Future

但也许存在更多的 rx 惯用方式来做到这一点?

0 投票
3 回答
5863 浏览

reactive-programming - 从 Observable 创建 BehaviorSubject

假设我有一个observableA从网络获取数据并将其发出的数据,如果我每次尝试从网络请求数据时都订阅这个 observable,那么它会很重。

我想创建 BehaviorSubject 并将其连接到 observableA,以便任何其他线程/对象都将订阅 BehaviorSubject 以获得最新发出的数据。

到目前为止,我无法管理它来编码。我无法创建空的 BehaviorSubject 并在 observableA 中调用,因为它们彼此不相关。我无法订阅 observableA 并获得 BehaviorSubject 作为观察者,知道如何完成它吗?或者甚至更好?

0 投票
2 回答
873 浏览

algorithm - 异步、提前退出、串联的 Observable

假设我们有 3 个 observables ,ABC。我需要同时运行所有 3 个(异步,对于外行),但是:

  1. 如果我从A得到任何东西,发出它......不要发出任何其他东西
  2. 如果A完成而没有发出任何东西,则将规则 1 应用于B
  3. 如果B完成而没有发出任何东西,则从C发出项目。
  4. 如果C完成时没有发出任何东西,则发出一个默认项。

昨天我花了几个小时试图解决这个问题,而且 RxJava 中似乎没有任何操作组合可以让我做到这一点。

您可以考虑从左到右级联的值:

A --> B --> C

而且,级联被阻塞,而每个级联运行异步并缓存它们的值。

A(无)--> B(无)--> C(无)--> 默认项

需要明确的是,A必须在任何其他观察者发出任何内容之前完成。B 和 C 的逻辑相同,如果 A、B、C 未能发出任何内容,则默认为默认值。

显然这涉及到缓存,我绝对不想重播 observable。我将需要重播缓存的值。每个门都挂着。

该行为与concat()极为相似,只是如果在其之前有排放,则不会释放链的下一部分。

0 投票
1 回答
114 浏览

scala - 为什么head不取消订阅

假设您有以下Observable内容rxjava-scala-0.18.4

第二个断言失败,这意味着head没有取消订阅。我对 Observables 的理解是错误的还是应该head在第一个元素发出后取消订阅?

0 投票
1 回答
309 浏览

java - RxJava 中的测试主题

我正在查看 Netflix 的一个名为 RxJava 的响应式扩展的 Java 实现。

他们似乎选择将 .NET 实现Subject<T>TestSubject<T>. 有谁知道为什么它被称为TestSubject<T>which 对我来说会推断它可能不应该被使用?

我看不出如何从类中的方法创建 Observable。例如

0 投票
2 回答
7467 浏览

android - 在 RxJava 中处理 API 异常

我目前正在尝试围绕 RxJava 进行研究,但是在以优雅的方式处理服务调用异常时遇到了一些麻烦。

基本上,我有一个(改造)服务,它返回一个Observable<ServiceResponse>. ServiceResponse定义如下:

现在我想要的是将该通用响应映射到List<Account>包含在数据 JsonElement 字段中的 a (我假设您不关心Account对象的外观,所以我不会用它污染帖子)。以下代码非常适合成功案例,但我找不到处理 API 异常的好方法:

有一个更好的方法吗?这确实有效,onError 会向我的观察者触发,我会收到我抛出的错误,但看起来我做的绝对不对。

提前致谢!

编辑:

让我澄清一下我想要实现的目标:

我想要一个可以从 UI 调用的类(例如,Activity、Fragment 或其他)。该类将采用 aObserver<List<Account>>作为参数,如下所示:

该方法将返回一个订阅,当 UI 被分离/销毁/等时可以取消订阅。

参数化的观察者将处理 onNext 以处理传入帐户列表的成功响应。OnError 将处理任何异常,但也会传递任何 API 异常(例如,如果响应状态!= 200,我们将创建一个 Throwable 并将其传递给 onError)。理想情况下,我不想只是“抛出”异常,我想将它直接传递给观察者。这就是我看到的所有例子。

复杂之处在于我的改造服务返回一个ServiceResponse对象,所以我的观察者无法订阅它。我想出的最好的方法是在我的观察者周围创建一个观察者包装器,如下所示:

我仍然觉得我没有正确使用它。我之前绝对没有见过其他人使用 ObserverWrapper。也许我不应该使用 RxJava,尽管 SoundCloud 和 Netflix 的人在他们的演示文稿中真的向我推销了它,而且我非常渴望学习它。

0 投票
3 回答
44930 浏览

java - 如何等待异步 Observable 完成

我正在尝试使用 rxjava 构建示例。该示例应协调 ReactiveWareService 和 ReactiveReviewService 重新运行 WareAndReview 组合。

鉴于我不想返回 Observable,我该如何等待异步 Observable (findReviewsByItem) 完成?