3

我正在为返回承诺的方法编写方面。考虑以下方法:

public Mono<Stream> publishToKafka(Stream s) {
    //publishToKafka is asynchronous
    return Mono.just(s).flatMap(worker::publishToKafka);
}

我想缓存发布是否成功。由于这是一个横切关注点,因此 Aspect 看起来是最好的设计。这是我的观点。

@Around("@annotation....")
public Object cache() {
    //get the data to cache from the annotation
    Object result = pjp.proceed();
    cache.cache("key","data");
    return result;
}

现在由于publishToKafka是异步的,一旦线程切换发生并被cache.cache()调用,目标方法就会返回。这不是我想要的。我想要的是,如果事件成功发布到 Kafka,结果应该被缓存。以下建议有效。

@Around("@annotation....")
public <T extends Stream<T>> Mono<T> cache() {
    //get the data to cache from the annotation
    return ((Mono<T>)pjp.proceed()).doOnNext(a -> cache.cache(key, data));
}

我想了解这里发生了什么。这是否发生在管道的组装期间?或者在我的建议添加操作员的执行期间(pjp.proceed()返回一个承诺)doOnNext

我需要在此示例的上下文中了解汇编与执行时间。

4

1 回答 1

3

Spring AOP 和 AspectJ 方面总是在与截获的连接点相同的线程中同步执行。因此,如果您截获的方法立即返回,并且返回值类似于一个承诺、未来或无(void)与回调相结合,那么您不能期望在方面的建议中神奇地获得异步结果。您确实需要让方面了解异步情况。

说到这里,我还想提一下,我之前从未使用过响应式编程,我只知道这个概念。根据我在您的建议中看到的,该解决方案应该有效,但有一点不太好:您让建议返回一个由您的call返回的 Mono实例。也许在注册了缓存回调之后返回原始文件会更干净,以避免任何副作用。doOnNext(..) Monoproceed()

我不知道还能解释什么,情况已经很清楚了。如果我的解释不够,请随时提出直接相关的后续问题。

于 2020-03-11T08:42:42.133 回答