1

我对每个请求所属的线程模型的一些反应器概念有点困惑。我阅读了https://projectreactor.io/docs/core/release/reference但仍不清楚。让我们看一个例子:

@Autowired
UserRepository userRepository;

@GetMapping
Flux<User> findAll() {
    log.info("findAll request arrived");
    final Flux<User> defer = Flux.defer(() -> {
          return Flux.fromIterable(userRepository.findAll());
    });
    return defer;
}

日志:[boundedElastic-4] - INFO - findAll 请求到达

在 Schedulers.boundedElastic 线程池中执行 GET 方法(根据文档用于 I/O 绑定工作)

@PostMapping
Mono<User> save(@RequestBody User user) {
    log.info("save request arrived");
    final Mono<User> newUser = Mono.fromCallable(() -> {
         final User userMono = userRepository.save(user);
          log.info("user saved!");
          return userMono;
    });
    return newUser.subscribeOn(Schedulers.boundedElastic());
}

日志:[reactor-http-nio-6] - INFO - 保存请求到达

POST 方法落在http-nio线程池上。

@PostMapping("/test")
Mono<User> test() {
    log.info("test");
    return Mono.just(new User());
}

没有正文的 POST 也落在 Schedulers.boundedElastic 上。

@Bean
public ReactiveWebServerFactory reactiveWebServerFactory() {
    NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory();
    final ReactorResourceFactory reactorResourceFactory = new ReactorResourceFactory();
    reactorResourceFactory.setLoopResources(LoopResources.create("http-nio", 4, true));
    reactorResourceFactory.setUseGlobalResources(false);
    factory.setResourceFactory(reactorResourceFactory);
    factory.setPort(8080);
    return factory;
}

这就是我可以配置http-nio线程池的方式。

所以,我的问题是:

  1. 为什么带有 body 的 POST 方法会被http-nio线程池处理?
  2. 这个http-nio线程池应该是一个较小的线程池,那么为什么带有 body 的 POST 方法(我认为被认为是阻塞代码)落在了它们身上?
  3. 有意义返回 newUser.subscribeOn(Schedulers.boundedElastic()); 或者它应该保持在同一个线程上?
4

1 回答 1

0
  1. 因为 I/O 操作(例如读取/保存某些内容到数据库或其他服务)应该发生在不同的线程池上。如果您的存储库是反应式的,那么您可以看到它在不同的池上运行,将http-nio线程返回到池中。也是如此WebClient。如果您使用的是封装在 Reactor API 中的阻塞代码,那么您必须确保它将在不同的线程池上运行。
  2. 这取决于。从我可以看到您的存储库不是响应式的,因为它返回User而不是Mono<User. 然后使用不同的线程池是有意义的,这样你就不会阻塞http-nio线程。但是,为了确保执行线程将被切换,您必须使用 flatMap ,因此代码如下所示:
    @PostMapping
    Mono<User> save(@RequestBody User user) {
        log.info("save request arrived");
        return Mono.just(user)
                .flatMap(user -> saveUser(user));
    }
    
    private Mono<User> saveUser(User user) {
        return Mono.fromCallable(() -> {
            final User userMono = userRepository.save(user);
            log.info("user saved!");
            return userMono;
        }).subscribeOn(Schedulers.boundedElastic());
    }

此外,使用由您可以控制和监视的线程池支持的调度程序也是一个好主意。恕我直言,一个经验法则是为每个资源使用一个专用的线程池。因此,例如 1 个用于 Postgres DB 的线程池、1 个用于 google API(REST 调用)的线程池和 1 个用于 GitHub API 的线程池。为什么?如果这些资源中的任何一个出现问题(例如,它将在一定时间内不可用),那么在其他线程池上运行的代码路径将不会被阻塞,并且您的应用程序将运行,至少对于某些代码路径。

于 2021-10-03T11:55:35.493 回答