我有一个简单的弹簧驱动服务,它通过 amqp 发布事件。该配置基于bootiful-axon。
现在我希望服务保持一些私有状态。这是一个简单的用例,可以通过 3 个额外的事件来实现。这些事件在服务范围之外没有任何意义,所以我不希望它们“离开”。
如何指定哪些事件应该通过 amqp 发布,哪些不应该发布?
我有一个简单的弹簧驱动服务,它通过 amqp 发布事件。该配置基于bootiful-axon。
现在我希望服务保持一些私有状态。这是一个简单的用例,可以通过 3 个额外的事件来实现。这些事件在服务范围之外没有任何意义,所以我不希望它们“离开”。
如何指定哪些事件应该通过 amqp 发布,哪些不应该发布?
这就是我解决它的方法:
SpringAMQPPublisher
拦截send
方法的自定义:
public class SelectiveAmqpPublisher extends SpringAMQPPublisher {
static boolean shouldSend (Class<?> pt) {
return PublicEvent.class.isAssignableFrom(pt);
}
public SelectiveAmqpPublisher (
SubscribableMessageSource<EventMessage<?>> messageSource) {
super(messageSource);
}
@Override
protected void send (List<? extends EventMessage<?>> events) {
super.send(events.stream()
.filter(e -> shouldSend(e.getPayloadType()))
.collect(Collectors.toList()));
}
}
配置:
@Autowired
private AMQPProperties amqpProperties;
@Autowired
private RoutingKeyResolver routingKeyResolver;
@Autowired
private AMQPMessageConverter aMQPMessageConverter;
@Bean(initMethod = "start", destroyMethod = "shutDown")
public SpringAMQPPublisher amqpBridge(
EventBus eventBus,
ConnectionFactory connectionFactory,
AMQPMessageConverter amqpMessageConverter) {
SpringAMQPPublisher publisher = new SelectiveAmqpPublisher(eventBus);
// The rest is from axon-spring-autoconfigure...
publisher.setExchangeName(amqpProperties.getExchange());
publisher.setConnectionFactory(connectionFactory);
publisher.setMessageConverter(amqpMessageConverter);
switch (amqpProperties.getTransactionMode()) {
case TRANSACTIONAL:
publisher.setTransactional(true);
break;
case PUBLISHER_ACK:
publisher.setWaitForPublisherAck(true);
break;
case NONE:
break;
default:
throw new IllegalStateException("....");
}
return publisher;
}