我正在为返回承诺的方法编写方面。考虑以下方法:
public Mono<Stream> publishToKafka(Stream s) {
//publishToKafka is asynchronous
return Mono.just(s).flatMap(worker::publishToKafka);
}
我想缓存发布是否成功。由于这是一个横切关注点,因此 Aspect 看起来是最好的设计。这是我的观点。
@Around("@annotation....")
public Object cache() {
//get the data to cache from the annotation
Object result = pjp.proceed();
cache.cache("key","data");
return result;
}
现在由于publishToKafka
是异步的,一旦线程切换发生并被cache.cache()
调用,目标方法就会返回。这不是我想要的。我想要的是,如果事件成功发布到 Kafka,结果应该被缓存。以下建议有效。
@Around("@annotation....")
public <T extends Stream<T>> Mono<T> cache() {
//get the data to cache from the annotation
return ((Mono<T>)pjp.proceed()).doOnNext(a -> cache.cache(key, data));
}
我想了解这里发生了什么。这是否发生在管道的组装期间?或者在我的建议添加操作员的执行期间(pjp.proceed()
返回一个承诺)doOnNext
?
我需要在此示例的上下文中了解汇编与执行时间。