问题标签 [rxjs]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
2382 浏览

system.reactive - 消息处理节流/背压

我有消息的来源,它是一个Observable. 对于每条消息,我想进行一个 HTTP 调用,该调用将产生另一个Observable,所以我将它们与 结合在一起,flatMap然后将它们发送给某个订阅者。这里是这个场景的代码:

这个例子是用咖啡脚本编写的,但我认为问题陈述对于任何其他 Rx 实现都是有效的。

我对这种方法的问题是loadMessages会很快产生大量消息。这意味着,我在很短的时间内发出了很多 HTTP 请求。这在我的情况下是不可接受的,所以我想将并行 HTTP 请求的数量限制在 10 个左右。换句话说,当我发出 HTTP 请求时,我想限制管道或应用某种背压。

Rx 是否有任何标准方法或最佳实践来处理这种情况?

目前,我实现了非常简单(并且非常次优)的背压机制,如果系统处理的按摩过多,则会忽略滴答声。它看起来像这样(简化版):

不过,我不确定这是否可以做得更好,或者 Rx 可能已经有一些机制来处理这种需求。

0 投票
1 回答
8434 浏览

reactive-extensions-js - 将 RxJS Observable 收集到数组

我想使用 RxJS 将异步事件世界与同步世界“桥接”起来。具体来说,我想创建一个函数,该函数返回在某个时间间隔内收集的事件数组。

我可以创建 Observable 来做我想做的事

我可以打印正确的值就好了

这打印

但我想要的是将此数组分配给变量。从概念上讲,我想要类似的东西

var collectedDuringSecond = source.toPromise.getValue()

这个想法是 getValue 会阻塞,所以在上面的行完成之后,collectedDuringSecond 将包含 [0,1,2,3,4,5,6,7,8]

0 投票
2 回答
1396 浏览

rxjs - 带有 zip 的 Rx.Observable.repeat(无限期地)导致浏览器停止响应

以下代码导致浏览器停止响应,例如使用像“100”这样的值来重复解决问题,但在我的情况下,我没有具体的值。请您提出一个解决方案:

0 投票
1 回答
325 浏览

reactive-extensions-js - RxJS 可变长度窗口

我想知道如何创建 windowWithMaxCount 的效果,它可以像 windowWithCount 一样工作,但窗口大小会从 1 变为 maxCount。

我正在做的是基于 c 事件流绘制折线图。折线图需要 50 个点的数组。当新点到达时,我需要在右边推出一个点并将这个新点放在左边。

所以一般 observable.windowWithCount(50,1) 就是这样做的。对于第一个窗口,唯一的问题是我必须等到所有 50 个元素都可用。在此期间,用户在屏幕上什么也看不到。

相反,我想要发生的是,一旦第一个点到达,我想获得大小为 1 的窗口,然后是大小为 2 的窗口等,直到我获得大小为 50(maxCount)的窗口。此时所有后续窗口的大小将是 50。

屏幕上的效果是线条从左到右填充屏幕,直到感觉整个屏幕。

0 投票
1 回答
119 浏览

javascript - RxJS 和不可访问的元素

我有一个场景,在给定一个 observable 的情况下,我想知道任何从未到达任何订阅者的元素(例如过滤掉)并据此采取行动。实现这样的目标的最佳方法是什么?

0 投票
2 回答
1355 浏览

observable - 创建一个延迟下一个值的 Observable

我正在尝试使用 RxJS 创建一个可观察的对象,它可以执行如图所示的操作。

预期的可观察映射

  • 获取一个值并在获取下一个值之前等待一段固定的时间。
  • 下一个将是等待期间发出的最后一个值,跳过其余的。
  • 如果在没有发出任何值的情况下经过等待间隔,则应立即抓取下一个,如图像的最后一个示例所示。
0 投票
2 回答
12241 浏览

rxjs - 如何使 RxJS 中的事件超时?

我试图检测mousedown事件是否在mouseup.

我在创建的 Observable 上使用 timeout()fromEvent()这样做,但超时返回两个 Observables。

下面,如果在 1 秒内触发,订阅流返回事件mousedown,但它也返回 1。

但是,这可以按预期工作:

我希望这段代码可以工作:

0 投票
1 回答
2207 浏览

javascript - 当一次足够时,RxJs 反应映射被调用两次

在下面的示例中,每次 onNext 调用都会调用 map 两次,这是不必要的,因为 ds 的相同值可以被两个观察者重用。如何编写代码以使每次 onNext 调用只调用一次地图?还是我应该使用承诺?

0 投票
1 回答
153 浏览

javascript - 是什么开始了反应式表达式中的热可观察序列

因此,在下面来自GitHub 上 RxJS的示例中,鼠标移动可观察到的“激活”是什么时候,我的意思是,是什么触发它开始对 mousemove 事件进行采样?

我认为订阅将标记 mousedrag 中所有可观察对象的序列的开始,但似乎并非如此。显然,在 mousedown 之前有 mousemove 事件,但这些事件从未使用过。

任何见解将不胜感激。

0 投票
3 回答
1905 浏览

system.reactive - 如何用 observables 定义循环

我正在尝试设置一个简单游戏的更新循环,并考虑到 observables。顶层组件是一个模型,它接受输入命令并产生更新;和一个view,它显示接收到的更新,并产生输入。单独来看,两者都可以正常工作,有问题的部分是将两者放在一起,因为两者都依赖于另一个。

将组件简化为以下内容:

我把事情联系在一起的方式是这样的:

也就是说,我添加一个主题作为输入流的占位符,并将模型附加到该主题。然后,在构建视图之后,我将实际输入传递给占位符主题,从而关闭循环。

然而,我不禁觉得这不是正确的做事方式。为此使用主题似乎是矫枉过正。有没有办法用 publish() 或 defer() 或类似的东西做同样的事情?

更新:这是一个不太抽象的例子来说明我遇到的问题。下面是一个简单“游戏”的代码,玩家需要点击一个目标才能击中它。目标可以出现在左侧或右侧,每当它被击中时,它就会切换到另一侧。看起来很简单,但我仍然觉得我错过了一些东西......