8

Project Reactor 通过定义Scheduler. 它还为使用CompletableFuture's 的库提供了一个桥梁Mono.fromFuture(..)

AWS 的DyanmoDB 异步客户端执行CompletableFuture从 API 调用返回的java.util.concurrent.Executor. 默认情况下,它会创建一个Executor由它也创建的线程池支持的线程池。结果是,即使是定义为Schedulerlike的流也会Mono.fromFuture(..).subscribeOn(Schedulers.boundedElastic())在库创建的池中的线程上执行,而不是来自Schedulers.boundedElastic(). 所以我们看到线程名称像sdk-async-response-0-2,而不是像boundedElastic-1.

幸运的是,图书馆允许我们提供我们自己的Executor,如此处所示,所以我的问题是:

您如何构建一个在运行时使用流的该部分定义Executor的线程的线程?Scheduler

用例

我们有一个存储库类,它有一个findById方法,我们需要调用者能够控制在哪个Scheduler上运行,因为它在这些截然不同的上下文中使用:

  1. Schedulers.boundedElastic()在调度程序上运行的 API 响应。
  2. 处理从定义的调度程序按顺序在每个分区的线程上执行的 Kafka 消息,如Reactor Kafka 文档中所示。

尝试

我们已经尝试定义一个Executorusing bothSchedulers.immediate()Runnable::run,如图所示,但两者都导致在 Netty 事件循环线程(示例名称:)上执行aws-java-sdk-NettyEventLoop-0-2,而不是在定义的线程上执行Scheduler

DynamoDbAsyncClient.builder()
    .asyncConfiguration(builder -> builder.advancedOption(
        SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
        runnable -> Schedulers.immediate().schedule(runnable)
    ))
    .build();
DynamoDbAsyncClient.builder()
    .asyncConfiguration(builder -> builder.advancedOption(
        SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
        Runnable::run
    ))
    .build();
4

1 回答 1

21

第 1 部分。观察与订阅

调查这个问题,我发现需要在特定线程上执行后观察元素。准确地说,在这种情况下观察意味着*能够在某个特定线程上处理流中的值。在 RxJava 中,我们有一个适当的操作符,就像这样调用,但在 Project Reactor 中,我们将相同的操作称为publishOn.

因此, * 如果你想处理数据 *Schedulers.boundedElastic()那么你应该使用下面的结构

Mono.fromFuture(..)
    .publishOn(Schedulers.boundedElastic())

但是等等,.subscribeOn也有效???

阅读前面的结构,您可能会开始担心,因为您 100% 确定

Mono.fromRunnable(..)
    .subscribeOn(Schedulers.boundedElastic())

onNext在线程上发送boundedElastic-1,所以有什么问题一样fromFuture

这里有一个技巧:

永远不要使用subscribeOnwith Futures/CompletableFuture或任何可以在下面使用自己的异步机制的东西

如果我们查看背后发生的事情subscribeOn,您会发现类似以下内容:

//  Simplified version of SubscribeOn operator
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
    Scheduler scheduler;
    Publisher<T> parent;
    scheduler.schedule(() -> parent.subscribe(actual));
}

这基本上意味着subscribe将在单独的线程上调用父级的方法。

这种技术适用于fromRunnable, fromSupplierfromCallable因为它们的逻辑发生在subscribe方法中:

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
    Operators.MonoSubscriber<T, T>
    sds = new Operators.MonoSubscriber<>(actual);

    actual.onSubscribe(sds);
    // skiped some parts 
    T t = supplier.get();
    if (t == null) {
        sds.onComplete();
    }
    else {
        sds.complete(t);
    }
}

这意味着它几乎等于

scheduler.schedule(() -> {
    T t = supplier.get();
    if (t == null) {
        sds.onComplete();
    }
    else {
        sds.complete(t);
    }
})

相比之下,fromFuture工作要复杂得多。一个简短的测验。

我们可以在哪个线程上观察到一个值?(假设在 Main 线程上执行,任务在 ForkJoinPool 上执行)

var future = CompletableFuture
.supplyAsync(() -> {
  return value;
})
... // some code here, does not metter just code

future.thenAccept(value -> {
  System.out.println(Thread.currentThread())
});

以及正确答案......

它可能是 Thread Main
或者它可能是 ForkJoinPool 中的 Thread
...
因为它是 racy... 而此时,我们消费值,该值可能已经交付,所以我们只是读取读取volatile器线程上的字段(线程 Main ),否则,线程 Main 只会设置一个acceptor,因此稍后将在ForkJoinPool线程上调用接受器。

对,这就是为什么当你使用fromFuturewith时subscribeOn,不能保证subscribeOn线程会观察给定的值CompletableFuture

这就是为什么publishOn确保值处理发生在所需线程上的唯一方法的原因。

好吧,我应该一直使用publishOn下去吗???

是和不是。这取决于。

如果您使用Mono- 在 99% 的情况下,publishOn如果您想确保数据处理发生在特定线程上,则可以使用 - 始终使用publishOn.

不用担心潜在的开销,即使您不小心使用了 Project Reactor,它也会照顾您。Project Reactor 有几个优化可以在运行时替换你publishOnsubscribeOn(如果它是安全的而不破坏行为),所以你会得到最好的。

第 2 部分。掉下Scheduelrs的兔子洞

从不使用Schedulers.immediate()

它几乎是无操作调度程序,基本上可以

Schedulers.immediate().scheduler(runnable) {
   runnable.run()
}

对,它对反应堆用户没有任何用处,我们仅将其用于内部需求。

好的,那么我如何使用调度程序在命令式世界中作为执行程序使用它

有两种选择:

快速路径:分步指南

1.a) 创建你的有界Executor. (例如Executors.fixed...
1.b)ScheduledExecutorService如果你想获得周期性任务和延迟任务的力量,请创建你的有界 2)使用API从你的执行程序
创建一个 3)在命令式世界中使用你的有界,使用你的这是围绕有界的反应世界SchedulerSchedulers.fromExecutorXXX
ExecutorScheduler

漫长的道路

快来了...

第 3 部分。如何序列化执行。

快来了

于 2020-03-22T10:41:34.727 回答