From beb208c50df78dc366a8b2c2128ecd7fac883261 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Fri, 28 Jun 2019 11:00:25 -0400 Subject: [PATCH] GH-1038: RT: Fix evaluatedFastReplyTo Fixes https://github.com/spring-projects/spring-amqp/issues/1038 Don't set `evaluatedFastReplyTo` if we didn't actually evaluate it because the broker is down on the first request. **cherry-pick to all 2.x; backport to 1.7.x** # Conflicts: # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java # Conflicts: # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java --- .../amqp/rabbit/core/RabbitTemplate.java | 11 +++++---- .../amqp/rabbit/core/RabbitTemplateTests.java | 24 +++++++++++++++++++ 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java index d06ca27d68..ebedd8fdd6 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.springframework.amqp.AmqpConnectException; import org.springframework.amqp.AmqpException; import org.springframework.amqp.AmqpIllegalStateException; import org.springframework.amqp.AmqpRejectAndDontRequeueException; @@ -665,15 +666,17 @@ protected boolean useDirectReplyTo() { } if (this.replyAddress == null || Address.AMQ_RABBITMQ_REPLY_TO.equals(this.replyAddress)) { try { - execute(new ChannelCallback() { + return execute(new ChannelCallback() { @Override - public Void doInRabbit(Channel channel) throws Exception { + public Boolean doInRabbit(Channel channel) throws Exception { channel.queueDeclarePassive(Address.AMQ_RABBITMQ_REPLY_TO); - return null; + return true; } }); - return true; + } + catch (AmqpConnectException ex) { + throw ex; } catch (Exception e) { if (this.replyAddress != null) { diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java index 2e08bd1f5c..ace4b1d001 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java @@ -17,10 +17,13 @@ package org.springframework.amqp.rabbit.core; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.BDDMockito.willThrow; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; @@ -44,6 +47,7 @@ import org.mockito.stubbing.Answer; import org.springframework.amqp.AmqpAuthenticationException; +import org.springframework.amqp.AmqpConnectException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.core.ReceiveAndReplyCallback; @@ -123,6 +127,7 @@ protected void doRollback(DefaultTransactionStatus status) throws TransactionExc template.setChannelTransacted(true); txTemplate.execute(new TransactionCallback() { + @Override public Object doInTransaction(TransactionStatus status) { template.convertAndSend("foo", "bar"); @@ -130,6 +135,7 @@ public Object doInTransaction(TransactionStatus status) { } }); txTemplate.execute(new TransactionCallback() { + @Override public Object doInTransaction(TransactionStatus status) { template.convertAndSend("baz", "qux"); @@ -188,6 +194,7 @@ public void dontHangConsumerThread() throws Exception { final AtomicReference consumer = new AtomicReference(); doAnswer(new Answer() { + @Override public Object answer(InvocationOnMock invocation) throws Throwable { consumer.set((Consumer) invocation.getArguments()[6]); @@ -224,6 +231,23 @@ public void testRetry() throws Exception { assertEquals(3, count.get()); } + @Test + public void testEvaluateDirectReplyToWithConnectException() { + org.springframework.amqp.rabbit.connection.ConnectionFactory mockConnectionFactory = + mock(org.springframework.amqp.rabbit.connection.ConnectionFactory.class); + willThrow(new AmqpConnectException(null)).given(mockConnectionFactory).createConnection(); + RabbitTemplate template = new RabbitTemplate(mockConnectionFactory); + + try { + template.convertSendAndReceive("foo"); + } + catch (Exception ex) { + assertThat(ex, instanceOf(AmqpConnectException.class)); + } + + assertFalse(TestUtils.getPropertyValue(template, "evaluatedFastReplyTo", Boolean.class)); + } + @Test public void testRecovery() throws Exception { ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);