Project Reactor 通过定义Scheduler
. 它还为使用CompletableFuture
's 的库提供了一个桥梁Mono.fromFuture(..)
。
AWS 的DyanmoDB 异步客户端执行CompletableFuture
从 API 调用返回的java.util.concurrent.Executor
. 默认情况下,它会创建一个Executor
由它也创建的线程池支持的线程池。结果是,即使是定义为Scheduler
like的流也会Mono.fromFuture(..).subscribeOn(Schedulers.boundedElastic())
在库创建的池中的线程上执行,而不是来自Schedulers.boundedElastic()
. 所以我们看到线程名称像sdk-async-response-0-2
,而不是像boundedElastic-1
.
幸运的是,图书馆允许我们提供我们自己的Executor
,如此处所示,所以我的问题是:
您如何构建一个在运行时使用流的该部分定义
Executor
的线程的线程?Scheduler
用例
我们有一个存储库类,它有一个findById
方法,我们需要调用者能够控制在哪个Scheduler
上运行,因为它在这些截然不同的上下文中使用:
Schedulers.boundedElastic()
在调度程序上运行的 API 响应。- 处理从定义的调度程序按顺序在每个分区的线程上执行的 Kafka 消息,如Reactor Kafka 文档中所示。
尝试
我们已经尝试定义一个Executor
using bothSchedulers.immediate()
和Runnable::run
,如图所示,但两者都导致在 Netty 事件循环线程(示例名称:)上执行aws-java-sdk-NettyEventLoop-0-2
,而不是在定义的线程上执行Scheduler
。
DynamoDbAsyncClient.builder()
.asyncConfiguration(builder -> builder.advancedOption(
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
runnable -> Schedulers.immediate().schedule(runnable)
))
.build();
DynamoDbAsyncClient.builder()
.asyncConfiguration(builder -> builder.advancedOption(
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
Runnable::run
))
.build();