问题标签 [rx.net]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
54 浏览

c# - 如何通过某些逻辑的项目?

我正在寻找一种通过特定逻辑的项目的方法。最明显的答案可能是使用 .Select ,它适用于大多数情况,但我有一个特殊情况,这个问题实际上可以改写为在所有订阅者消费项目后如何调用某个方法?

我正在考虑一个看起来像这样的扩展PassThrough(this IObservable<TSource> obj, Action<TSource, IObserver<TResult>> selector),我会以下列方式使用它

其中最重要的部分是在对象传递给 OnNext 之后调用 .Dispose 对结果对象,换句话说,在它被订阅者消费之后。我没有找到这样的扩展方法。有人可以举例说明如何使用现有的 Rx.NET API 来实现它,或者假设它是可能的,如何创建一个可以做到这一点的扩展?

0 投票
1 回答
166 浏览

c# - 如何用可能需要重新运行的任务组合 observable?

假设我有一个运行查询并返回一组结果的异步方法:

而且我有一个可观察的,只要需要再次获取结果集就会触发:

我想要的是:

这将 (1) 运行查询并产生一个初始结果集,以及 (2) 每次NeedToRefetch触发时,再次运行查询并产生另一个结果集。

组成这个 observable 的最好方法是什么?

如果我不需要那个初始结果集,我可以这样做:

因此,为了确保查询至少运行一次,我可以这样做:

但后来我开始阅读有关冷和热可观察对象的信息,我想知道是否应该做这样的事情:

然后我想知道这是否是我应该使用的情况 Observable.Create

然后我停止了思考并写下了这个问题。

0 投票
1 回答
95 浏览

c# - 在 C# 中使用 RX 自动保存

我想实现一个自动保存功能。我有两个 Observable:

  • IObservable<Unit> changes:每次用户编辑文本时发出一个项目
  • IObservable<Unit> saves: 每次按下保存按钮时发出一个保存事件

现在我想将它们组合成第三个流writeBack。此流有一个订阅者,它将当前文本写入数据库。

如何创建 writeBack 流,以使其满足以下属性?

  • 如果 3 秒内没有发生任何更改(如油门) ,则未保存的更改将被写回
  • 保存事件立即写回最后未保存的更改

我想确保写回未保存的更改。如果出现以下情况,我不想保存文本:

  • 有一个保存事件,但没有变化
  • 连续两次保存事件,但两者之间没有变化
  • 在更改与其自动保存之间有一个保存事件
0 投票
1 回答
510 浏览

c# - Rx.NET 不会触发 onError 或 onComplete

我正在研究 Rx.NET,在下面的示例代码中,我试图执行一个异步任务,观察日期时间选择器值更改事件。onNext 运行完美,但 onError 和 onComplete 不能。我究竟做错了什么?

0 投票
1 回答
302 浏览

c# - 使用/不使用 Rx 重试 tpl 数据流

我正在使用带有 TPL 数据流的 .net 响应式扩展。这是我的管道:

我从某个外部源获取数据点作为流,然后使用数据流 TransformBlocks 转换数据点。在此之后,我使用 Rx 缓冲区缓冲流式点 1 秒,最后我使用数据流 Actionblock 将这些缓冲的数据点发布到 REST 端点。

我想对瞬态错误重试 REST post 操作。我应该在哪里重试:

  1. 缓冲后?
  2. 内部动作块?
  3. 重试时连续流式传输会发生什么?我不想错过任何数据。
0 投票
2 回答
498 浏览

c# - 如何使用 Reactive Extensions 轮询状态?

已经有一个关于使用 Reactive 进行数据库轮询的好问题(Database polling with Reactive Extensions

我有一个类似的问题,但有一个转折:我需要将上一个结果中的值提供给下一个请求。基本上,我想对此进行投票:

这个想法是返回自上次请求以来的所有新项目


  1. 每分钟我都想打电话GetNewResultsAsync
  2. 我想将CurrentAsOf上一次调用中的作为参数传递给previousRequest参数
  3. 下一个呼叫GetNewResultsAsync实际上应该在上一个呼叫之后一分钟发生

基本上,有没有比以下更好的方法:

另请注意,此版本允许messageResultSet在等待下一次迭代时收集 (例如,我想也许我可以使用Scan将先前的结果集对象传递给下一次迭代)

0 投票
3 回答
1670 浏览

c# - 将最后一项推送到 Observable (Sequence)

我有一个IObservable<Item>类内部,我想公开一个只读属性,该属性提供在给定时间推送到可观察对象的最后一项。所以它将提供一个单一的值Item

如果没有推送任何值,则它必须返回一个默认值。

我怎样才能做到这一点而不必订阅可观察对象并拥有“支持字段”?

0 投票
0 回答
105 浏览

system.reactive - Rx.NET 按条件、最小和最大计时器进行批处理

我正在对流中的消息进行批处理,希望通过 3 个条件来完成:

  1. 如果批处理程序无法添加传入消息(对于批处理程序的任何内部逻辑)
  2. 如果有消息然后在X秒内没有消息(基本上是油门的作用)
  3. 如果有连续的消息流,然后每X秒更频繁,那么在Y秒后仍然关闭批处理(油门上限)

我需要能够在运行时更改XY秒而不会丢失当前批次(无论是在配置更改时立即关闭还是通过关闭条件关闭)。条件函数和批处理函数不应在并行线程中运行。

我正在使用 Rx-Main 2.2.5。到目前为止,我想出了下面的解决方案,它似乎有效,但我认为可能有更简单的反应扩展解决方案?如果关闭条件是“批处理器无法添加此消息”,则使用此解决方案 capTimer 也不会重新启动。

扩大:

和用法:

我想工作的测试:

0 投票
1 回答
516 浏览

c# - 为什么这些 Rx 序列乱序?

假设我写

产生的序列1 1 2 1 2 3符合预期。但现在如果我写

现在产生的序列1 1 2 1 2 1 3 2 3 4不是人们所期望的1 1 2 1 2 3 1 2 3 4。这是为什么?是否SelectMany()进行某种多线程合并?

0 投票
1 回答
420 浏览

c# - Rx.Net:无限地观察异步事件

我有一个帮助类,可以将文本消息保存到本地文件系统。此方法返回一个Task对象,并且根据定义是异步的。

我希望能够观察何时调用此方法,以便我可以持续监控缓冲区的大小和长度并据此做出决定。

我正在尝试使用 Reactive Extension for .NET 来实现这一点。但是,我无法想出一种设计,让我可以连续收听添加到缓冲区的消息。以下是我当前的实现:

以下是我订阅 observable 的方式:

我希望每次调用该方法时Receive都调用订阅者。但是,AFAIK,一旦调用了这个方法,observable 完成并且序列终止,所以未来的调用Receive将不会被监听。

有人可以推荐一种使用 Rx.Net 库来实现我正在寻找的这种可观察模式的方法,即如何保持序列打开并为其提供异步方法的结果?