问题标签 [rx-java]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
2726 浏览

scala - Scala 流媒体库差异(Reactive Streams/Iteratee/RxScala/Scalaz...)

我正在关注 Coursera 上的 Scala 函数式反应式编程课程,我们处理 RxScala Observables(基于 RxJava)。

据我所知,Play Iteratee 的库看起来有点像 RxScala Observables,其中 Observables 有点像 Enumerators,Observers 有点像 Iteratees。

还有 Scalaz Stream 库,也许还有其他一些?


所以我想知道所有这些库之间的主要区别。在哪种情况下,一个可能比另一个更好?


PS:我想知道为什么 Martin Odersky 没有为他的课程选择 Play Iteratees 库,因为 Play 在 Typesafe 堆栈中。这是否意味着 Martin 更喜欢 RxScala 而不是 Play Iteratees?


编辑: Reactive Streams计划刚刚宣布,作为尝试standardize a common ground for achieving statically typed, high-performance, low latency, asynchronous streams of data with built-in non-blocking back pressure

0 投票
4 回答
8228 浏览

java - 从普通 Java 事件创建 Observable

Observable从经典 Java 事件模式创建 Rx-Java 的最佳方法是什么?也就是说,给定

我要实施

我想出的实现是:

但是,我真的不喜欢它:

  1. 它非常冗长;

  2. 每个都需要一个监听Observer器(理想情况下,如果没有观察者,则应该没有监听器,否则应该有一个监听器)。这可以通过将观察者计数保持为 中的字段,OnSubscribeFunc在订阅时增加它并在取消订阅时减少来改进。

有更好的解决方案吗?

要求:

  1. 使用现有的事件模式实现而不更改它们(如果我正在控制该代码,我已经可以编写它以返回Observable我需要的)。

  2. 如果/当源 API 更改时出现编译器错误。不能使用Object实际事件参数类型或属性名称字符串来代替。

0 投票
2 回答
3469 浏览

java - 如何在时间间隔内将自定义类用作 Observable 和触发方法

我有一个类Producer,简化它有public Object readData() 我想让这个类成为ObservableRxJava)的方法。

如何指示应该调用哪个方法?我需要将我的Producer课程转换为FutureorIterable吗?

下一个问题是readData应该每 n 秒调用一次。某些方法,例如from,具有调度程序参数,但我找不到任何如何应用它的示例。我找到了间隔方法,但它会发出一个整数序列。到目前为止,没有 Observable 我使用Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(....)

0 投票
1 回答
84 浏览

scala - 在 rxjava 的 Observable 中找不到 def apply[T](items : T*) : rx.lang.scala.Observable[T]

例如,Observable(3, 2, 1)应该使用方法,但这里这里def apply[T](items : T*) : rx.lang.scala.Observable[T]不存在

有人可以帮忙解释一下吗?

0 投票
1 回答
3776 浏览

java - RxJava——终止无限流

我正在探索反应式编程和 RxJava。这很有趣,但我被困在一个我找不到答案的问题上。我的基本问题是:终止原本无限运行的 Observable 的响应式方法是什么?我也欢迎对我的代码提出批评和反应式最佳实践。

作为练习,我正在编写一个日志文件尾部实用程序。日志文件中的行流由Observable<String>. 为了BufferedReader继续阅读添加到文件中的文本,我忽略了通常的reader.readLine() == null终止检查,而是将其解释为我的线程应该休眠并等待更多记录器文本。

但是,虽然我可以使用 终止观察者takeUntil,但我需要找到一种干净的方法来终止原本无限运行的文件观察者。我可以编写自己的terminateWatcher方法/字段,但这会破坏 Observable/Observer 封装——我希望尽可能严格地遵守反应式范式。

这是Observable<String>代码:

这是在新行出现时打印新行的 Observer 代码:

我的两个问题是:

  1. 什么是终止原本无限运行的流的反应一致的方法?
  2. 我的代码中还有哪些其他错误让您哭泣?:)
0 投票
1 回答
1517 浏览

scala - scala 无限 rx 可观察创建 - 如何正确执行此操作?

我最近开始玩rxjava-scala,我想创建一个(可能)无限流可观察。查看 github 上的代码和打开的问题,我发现“开箱即用”的解决方案尚未实现(问题中的 usecase06 说它甚至没有为 java 实现)

所以,我试图提出我自己的实现。考虑以下:

和一个辅助方法:

和示例核心:

这似乎工作正常,但我对此不满意。首先,我正在创建一个新的Thread,这可能很糟糕。但即使我使用某种线程池,它仍然会感觉不对。所以我想我应该使用调度程序,这听起来像是一个合适的解决方案,只是我不知道如何在这种情况下使用它。我尝试rx.lang.scala.concurrency.Schedulers.threadPoolForIO使用该observeOn方法进行补充,但似乎我做错了。observable 的代码不会用它编译。任何帮助将不胜感激。谢谢!

0 投票
2 回答
3721 浏览

reactive-programming - 如何在 RxJava 中计算移动平均线

在金融领域,我们通常需要从一个时间序列数据流中计算出移动窗口聚合值,以移动平均为例,假设我们有以下数据流(T是时间戳,V是实际值):

从我们得到的流中计算移动平均值 3:

要计算移动平均线,我们似乎可以这样做:

  1. 从原始流构建一个 Observable
  2. 通过将值聚合到组中,从原始流构建一个 Observable
  3. 在步骤 2 中使用聚合运算符计算 Observable 的最终结果。

步骤 1 和 3 实现起来很简单,但是,对于步骤 2,当前的 RxJava 似乎没有内置运算符来生成移动窗口组,window/groupBy 运算符似乎不适合这种情况,我没有找到一种从现有运营商组成解决方案的简单方法,有人可以建议如何在 RxJava 中以“优雅”的方式做到这一点吗?

0 投票
1 回答
3707 浏览

scala - 什么是检查可观察对象是否完成的好方法

我想知道是否有一种方便的方法来检查 observable 是否已完成。例如我有一个测试

recovered方法返回一个 Observable[Try[T]] 并且是标准 Observable 的扩展。我想在源 Observable 完成时检查 Observable[Try[T]] 是否完成。

因此,我编写了一个带有主题的测试,我向其中发布了一些值,然后最终完成。有没有一种简单的方法可以检查 newOb 是否也已完成?Observable 中没有类似 isCompleted 的方法。

0 投票
3 回答
15025 浏览

java - Android Studio 添加 rxjava 库

考虑以下项目结构:

我从http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20a%3A%22rxjava-core%22 (sources.jar ) - 但我也尝试过其他人

然后我在 SubProject 中创建了 lib 文件夹,然后将 .jar 放入其中。

在 Android Studio 中,我右键单击库并选择“添加为库...”,级别:“项目库”和模块:“子项目”。

rxjava 使用包名“rx”。我实现了一些导入这个包的代码:

构建项目时发生以下错误:

我发现需要在 SubProject/build.grandle 中添加一行:

但随后它抛出:

我试图在项目结构中移动 .jar,但到目前为止还没有运气。

如何正确地将 3rd 方库添加到项目中?我自己创建“libs”文件夹可以吗?

0 投票
0 回答
315 浏览

reactive-programming - Vert.x - RxJava - zip without returning a new observable

I am looking for a method in RxJava that behaves like the zip method, but without returning any new observable.

I have two observables, one emitting RxMessage objects and the other one RxHttpClientResponse objects. I want some kind of method that receives both objects and executes a function. That's all, without returning a new observable that emits new elements.

Any suggestion?

Thanks in advance