0

I am playing with Spring-cloud-stream and RabbitMQ.

I have a REST endpoint which produces the messages.

@SpringBootApplication
@EnableBinding(MyProcessor.class)
public class ProducerDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerDemoApplication.class, args);
    }
}

@RestController
class ProducerController {

    @Autowired
    MyProcessor myProcessor;

    @RequestMapping(value = "sendmessage/{message}", method = RequestMethod.GET)
    public String sendMessage(@PathVariable("message") String message) {
        myProcessor.anOutput().send(MessageBuilder.withPayload(message).build());
        return "sent";  
    }

}

interface MyProcessor {
    String INPUT = "myInput";

    @Output("myOutput")
    MessageChannel anOutput();
}

Through another application I am consuming these messages.

@StreamListener(MyProcessor.INPUT)
public void eventHandler(String message) {
    System.out.println("**************  Message received => "+message);
}

When both applications are UP and running. I am able to publish the message and consume it at the consumer.

The problem I am facing in the following scenario:

I am purposefully making the consumer down and publishing the message through the producer. Now when the consumer starts it is receiving no message.

I suppose RabbitMQ guarantees message delivery.

Github links
https://github.com/govi20/producer-demo
https://github.com/govi20/consumer-demo

4

2 回答 2

1

正如我之前提到的,您已经在“myInput”中有错误配置,因为您没有配置会在消费者启动期间@Input导致错误。A component required a bean named 'myInput' that could not be found.因此,在消费者方面需要这样的东西

 interface MyProcessor {
    String INPUT = "myInput";

    @Input("myInput")
    MessageChannel myInput();
}

此外,如果您不定义 agroup它会导致 Rabbit 端的匿名队列(类似这样myInput.anonymous.pZg03h0zQ2-SHLh1_QL8DQ),这实际上会导致每次启动时队列的名称都不同,所以

spring.cloud.stream.bindings.myInput.destination=myInput
spring.cloud.stream.bindings.myInput.group=myGroup

将导致队列名称myInput.myGroup在启动之间持久且一致。

此外,在生产者端myOutput会创建一个没有路由到上述(或任何其他)队列的 Rabbit Exchange,因此 Rabbit 会丢弃消息,因此在创建路由之前,您可能无法接收来自生产者的消息在myOutput交换机和myInput.myGroup队列之间。但是,如果您按照我上面描述的方式配置输入,spring-cloud-stream 还将创建一个名为的交换器,该交换器myInput将自动路由到myInput.myGroup,因此,如果您将生产者更改为发送到该目的地,您将收到消费者的消息。

于 2018-10-14T18:55:38.903 回答
1

您需要一个关于消费者输入绑定的组。否则它是匿名的并绑定一个自动删除队列,该队列仅在消费者运行时存在。

于 2018-10-14T18:01:57.673 回答