我正在使用以下代码使用 Spring Data Redis 消费者组来消费 Redis 流,但即使我已经注释掉了确认命令,我的消息在服务器重新启动后也不会重新读取。
我希望如果我没有确认该消息,则应该在服务器被杀死并重新启动时重新读取它。我在这里想念什么?
@Bean
@Autowired
public StreamMessageListenerContainer eventStreamPersistenceListenerContainerTwo(RedisConnectionFactory streamRedisConnectionFactory, RedisTemplate streamRedisTemplate) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(streamRedisConnectionFactory,
containerOptions);
container.receive(Consumer.from("my-group", "my-consumer"),
StreamOffset.create("event-stream", ReadOffset.latest()),
message -> {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
//streamRedisTemplate.opsForStream().acknowledge("my-group", message);
});
container.start();
return container;
}