实际上,我什至不确定是否可以做我想做的事情:我正在Event.class
使用 Spring Integration 使用来自代理 (MQTT) 的事件流 (),并希望将流转发到另一个微服务。这两种服务都应该是水平可扩展的。因此,在其他服务中,我有一个 POST 端点,除了Flux<Event>
as 主体(使用 Spring Cloud Functions: spring-cloud-starter-function-webflux
)。在转发服务中,我想使用 Ribbon 和WebClient
.
到目前为止我所拥有的(简化):
@Configuration
public class EventFlowConfiguration{
@Autowired
private Webclient webClient;
@Bean
public MessageProducerSupport inboundAdapter() {
final MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(MqttAsyncClient.generateClientId(), this.mqttClientFactory, "/topic");
adapter.setConverter(new DefaultPahoMessageConverter());
return adapter;
}
@Bean
Publisher<Message<Event>> eventFlow() {
return IntegrationFlows
.from(inboundAdapter())
.handle((payload, header) -> this.eventHandler((String) payload))
.toReactivePublisher();
}
private Event eventHandler(String payload) {
// parsing, store in database ...
return event;
}
@PostConstruct
public void setTrigger() {
buildTrigger(webClient, "/receiveEventStream", eventFlow(), Event.class);
}
private <T, E extends Event> void buildTrigger(WebClient webClient, String uri, Publisher<Message<T>> publisher, Class<E> eventClass) {
webClient
.post()
.uri(uri)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(
Flux.from(publisher)
.retry()
.map(Message::getPayload)
,
eventClass
)
.retrieve()
.bodyToMono(Void.class)
.retry()
.subscribe();
}
}
我的问题:
(1)他们有机会测试这个吗?我该怎么做?关于 OkHttp,WireMock,一个测试中的内部弹簧启动应用程序,它有/eventReceiverEndpoint
但只是断言。然而,并没有真正到达任何地方。实际上 atm 我什至没有收到错误。我本来预计会出现 4xx,因为在启动时没有内存服务器。
(2)这两个retry()
调用真的重试双方的连接问题吗?如果断开连接,我会丢失消息吗?我该如何测试呢?
(3) Ribbon 与这个长寿的 Flux 结合是否仍然能够在建立连接后进行负载平衡(比如当最终消费者的负载变高时左右)。我想它必须自动将 Flux 重新连接到另一个实例?