我需要在持久化/刷新后将消息发送到带有 JPA 实体的 RabbitMQ 实例,这导致我将 as 配置rabbitTemplate为channelTransacted.
消费者是外部的,但是为了创建集成测试,我添加了一个嵌入式代理(Apache QPid)和一个侦听器,以便能够检查消息是否已发送。
如文档所示,我似乎陷入了僵局:
如果我们在同一个应用程序中有生产者和消费者,当生产者阻塞连接(因为 Broker 上没有资源了)并且消费者无法释放它们(因为连接被阻塞)时,我们可能最终会出现死锁。[...]
对于在消费者线程上执行的事务生产者来说,单独的 CachingConnectionFactory 是不可能的,因为它们应该重用与消费者事务关联的 Channel。
如果我设置rabbitTemplate.channelTransacted = false了,监听器就会被很好地调用,否则harness.getNextInvocationDataFor就等到它超时。
我希望仍然有一种方法可以进行这种集成测试,并且也许我配置了一些错误。
我已经尝试了 thesimple和directlistener 类型,没有任何区别:
queues:
transactions: 'transactions'
spring:
rabbitmq:
host: rabbitmq
username: guest
password: guest
dynamic: true # automatically create queues and exchanges on the RabbitMQ server
template:
routing-key: ${queues.transactions}
retry.enabled: true
# mandatory: true # interesting only for cases where a reply is expected
# publisher-confirms: true # does not work in transacted mode
publisher-returns: true # required to get notifications in case of send problems
# used for integration tests
listener:
type: direct
direct:
retry:
enabled: true
stateless: false # needed when transacted mode is enabled
max-attempts: 1 # since this is used just for integration tests, we don't want more
我使用 Spring Boot 2.1.3和 Apache spring-boot-starter-amqpQpid spring-rabbit-2.1.47.1.1 作为测试的嵌入式代理:
@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.main.allow-bean-definition-overriding=true")
@AutoConfigureTestDatabase
@Transactional
@ActiveProfiles("test")
public class SalesTransactionGatewayTest {
private static final String TRANSACTIONS_LISTENER = "transactions";
@TestConfiguration
@RabbitListenerTest(spy = false, capture = true)
public static class Config {
@Bean
public SystemLauncher broker() throws Exception {
SystemLauncher broker = new SystemLauncher();
Map<String, Object> attributes = new HashMap<>();
attributes.put(SystemConfig.TYPE, "Memory");
attributes.put(SystemConfig.INITIAL_CONFIGURATION_LOCATION, "classpath:qpid-config.json");
attributes.put(SystemConfig.STARTUP_LOGGED_TO_SYSTEM_OUT, false);
broker.startup(attributes);
return broker;
}
@Bean
public Listener listener() {
return new Listener();
}
}
public static class Listener {
@RabbitListener(id = TRANSACTIONS_LISTENER, queues = "${queues.transactions}")
public void receive(SalesTransaction transaction) {
Logger.getLogger(Listener.class.getName()).log(Level.INFO, "Received tx: {0}", transaction);
}
}
@Before
public void setUp() {
// this makes the test work, setting it to `true` makes it deadlock
rabbitTemplate.setChannelTransacted(false);
}
@Test
public void shouldBeSentToGateway() throws Exception {
SalesTransaction savedTransaction = service.saveTransaction(salesTransaction);
InvocationData invocationData = this.harness.getNextInvocationDataFor(TRANSACTIONS_LISTENER, 10, TimeUnit.SECONDS);
assertNotNull(invocationData);
assertEquals(salesTransaction, invocationData.getArguments()[0]);
}
}
11:02:56.844 [SimpleAsyncTaskExecutor-1] INFO org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer - SimpleConsumer [queue=transactions, consumerTag=sgen_1 identity=16ef3497] started
Mar 25, 2019 11:02:57 AM AmqpSalesTransactionGateway send
INFO: Sending transaction: 01a92a56-c93b-4d02-af66-66ef007c2817 w/ status COMPLETED
11:02:57.961 [main] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Attempting to connect to: [localhost:5672]
11:02:57.972 [main] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Created new connection: rabbitConnectionFactory.publisher#6d543ec2:0/SimpleConnection@79dd79eb [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56501]
java.lang.AssertionError
at org.junit.Assert.fail(Assert.java:86)
at org.junit.Assert.assertTrue(Assert.java:41)
at org.junit.Assert.assertNotNull(Assert.java:712)
at org.junit.Assert.assertNotNull(Assert.java:722)