2

实际上,我什至不确定是否可以做我想做的事情:我正在Event.class使用 Spring Integration 使用来自代理 (MQTT) 的事件流 (),并希望将流转发到另一个微服务。这两种服务都应该是水平可扩展的。因此,在其他服务中,我有一个 POST 端点,除了Flux<Event>as 主体(使用 Spring Cloud Functions: spring-cloud-starter-function-webflux)。在转发服务中,我想使用 Ribbon 和WebClient.

到目前为止我所拥有的(简化):

@Configuration
public class EventFlowConfiguration{ 

@Autowired
private Webclient webClient;

@Bean
public MessageProducerSupport inboundAdapter() {
    final MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(MqttAsyncClient.generateClientId(), this.mqttClientFactory, "/topic");
    adapter.setConverter(new DefaultPahoMessageConverter());
    return adapter;
}

@Bean
Publisher<Message<Event>> eventFlow() {
    return IntegrationFlows
            .from(inboundAdapter())
            .handle((payload, header) -> this.eventHandler((String) payload))
            .toReactivePublisher();
}

private Event eventHandler(String payload) {
    // parsing, store in database ...
    return event;
}

@PostConstruct
public void setTrigger() {
    buildTrigger(webClient, "/receiveEventStream", eventFlow(), Event.class);
}

private <T, E extends Event> void buildTrigger(WebClient webClient, String uri, Publisher<Message<T>> publisher, Class<E> eventClass) {
    webClient
            .post()
            .uri(uri)
            .contentType(MediaType.APPLICATION_STREAM_JSON)
            .body(
                    Flux.from(publisher)
                            .retry()
                            .map(Message::getPayload)
                    ,
                    eventClass
            )
            .retrieve()
            .bodyToMono(Void.class)
            .retry()
            .subscribe();
}
}

我的问题:

(1)他们有机会测试这个吗?我该怎么做?关于 OkHttp,WireMock,一个测试中的内部弹簧启动应用程序,它有/eventReceiverEndpoint但只是断言。然而,并没有真正到达任何地方。实际上 atm 我什至没有收到错误。我本来预计会出现 4xx,因为在启动时没有内存服务器。

(2)这两个retry()调用真的重试双方的连接问题吗?如果断开连接,我会丢失消息吗?我该如何测试呢?

(3) Ribbon 与这个长寿的 Flux 结合是否仍然能够在建立连接后进行负载平衡(比如当最终消费者的负载变高时左右)。我想它必须自动将 Flux 重新连接到另一个实例?

4

0 回答 0