From 5d73342b8a763f8fd4f247cea255c66a9fc9ff8e Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 1 Jul 2019 14:10:34 -0400 Subject: [PATCH] GH-1038: RT: More evaluateFastReplyTo Fixes Fixes https://github.com/spring-projects/spring-amqp/issues/1038 The previous fix to catch `AmqpConnectException` was incorrect - the `ShutdownSignalException` for the passive queue declaration failure is wrapped in an `AmqpConnectException` so we would have failed to detect the failure. Also, it has been reported that sometimes an `AmqpIOException` is thrown. - Add `AmqpIOException` to the catch block - Search for, and explitly check for the queue declaration failed exception - For all other cases rethrow so we can test again **cherry-pick to all supported** * * Fix log messages --- .../amqp/rabbit/core/RabbitTemplate.java | 27 ++++++--- .../amqp/rabbit/core/RabbitTemplateTests.java | 58 +++++++++++++++++++ 2 files changed, 76 insertions(+), 9 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java index 38369eb21d..4fb79f9d77 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java @@ -38,6 +38,7 @@ import org.springframework.amqp.AmqpConnectException; import org.springframework.amqp.AmqpException; +import org.springframework.amqp.AmqpIOException; import org.springframework.amqp.AmqpIllegalStateException; import org.springframework.amqp.AmqpRejectAndDontRequeueException; import org.springframework.amqp.core.Address; @@ -103,6 +104,7 @@ import com.rabbitmq.client.Envelope; import com.rabbitmq.client.GetResponse; import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownSignalException; /** *

@@ -951,18 +953,25 @@ protected boolean useDirectReplyTo() { return true; }); } - catch (AmqpConnectException ex) { - if (logger.isDebugEnabled()) { - logger.debug("Connection error, deferring directReplyTo detection"); + catch (AmqpConnectException | AmqpIOException ex) { + Throwable cause = ex; + while (cause != null && !(cause instanceof ShutdownSignalException)) { + cause = cause.getCause(); + } + if (cause instanceof ShutdownSignalException) { + if (RabbitUtils.isPassiveDeclarationChannelClose((ShutdownSignalException) cause)) { + if (logger.isWarnEnabled()) { + logger.warn("Broker does not support fast replies via 'amq.rabbitmq.reply-to', temporary " + + "queues will be used: " + cause.getMessage() + "."); + } + this.replyAddress = null; + return false; + } } - throw ex; - } - catch (Exception e) { if (logger.isDebugEnabled()) { - logger.warn("Broker does not support fast replies via 'amq.rabbitmq.reply-to', temporary " - + "queues will be used:" + e.getMessage() + "."); + logger.debug("IO error, deferring directReplyTo detection: " + ex.toString()); } - this.replyAddress = null; + throw ex; } } return false; diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java index ce2a9f7c9c..d45c135b9f 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java @@ -51,6 +51,7 @@ import org.springframework.amqp.AmqpAuthenticationException; import org.springframework.amqp.AmqpConnectException; import org.springframework.amqp.AmqpException; +import org.springframework.amqp.AmqpIOException; import org.springframework.amqp.core.Address; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; @@ -61,6 +62,7 @@ import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ChannelProxy; import org.springframework.amqp.rabbit.connection.PublisherCallbackChannel; +import org.springframework.amqp.rabbit.connection.RabbitUtils; import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory; import org.springframework.amqp.rabbit.connection.SingleConnectionFactory; import org.springframework.amqp.support.converter.SimpleMessageConverter; @@ -241,6 +243,62 @@ public void testEvaluateDirectReplyToWithConnectException() throws Exception { assertThat(TestUtils.getPropertyValue(template, "evaluatedFastReplyTo", Boolean.class)).isFalse(); } + @Test + public void testEvaluateDirectReplyToWithIOException() throws Exception { + org.springframework.amqp.rabbit.connection.ConnectionFactory mockConnectionFactory = + mock(org.springframework.amqp.rabbit.connection.ConnectionFactory.class); + willThrow(new AmqpIOException(null)).given(mockConnectionFactory).createConnection(); + RabbitTemplate template = new RabbitTemplate(mockConnectionFactory); + assertThatThrownBy(() -> template.convertSendAndReceive("foo")).isInstanceOf(AmqpIOException.class); + assertThat(TestUtils.getPropertyValue(template, "evaluatedFastReplyTo", Boolean.class)).isFalse(); + } + + @Test + public void testEvaluateDirectReplyToWithIOExceptionDeclareFailed() throws Exception { + ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class); + Connection mockConnection = mock(Connection.class); + Channel mockChannel = mock(Channel.class); + + given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection); + given(mockConnection.isOpen()).willReturn(true); + given(mockConnection.createChannel()).willReturn(mockChannel); + AMQP.Channel.Close mockMethod = mock(AMQP.Channel.Close.class); + given(mockMethod.getReplyCode()).willReturn(AMQP.NOT_FOUND); + given(mockMethod.getClassId()).willReturn(RabbitUtils.QUEUE_CLASS_ID_50); + given(mockMethod.getMethodId()).willReturn(RabbitUtils.DECLARE_METHOD_ID_10); + willThrow(new ShutdownSignalException(true, false, mockMethod, null)).given(mockChannel) + .queueDeclarePassive(Address.AMQ_RABBITMQ_REPLY_TO); + given(mockChannel.queueDeclare()).willReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0)); + SingleConnectionFactory connectionFactory = new SingleConnectionFactory(mockConnectionFactory); + connectionFactory.setExecutor(mock(ExecutorService.class)); + RabbitTemplate template = new RabbitTemplate(connectionFactory); + template.setReplyTimeout(1); + template.convertSendAndReceive("foo"); + assertThat(TestUtils.getPropertyValue(template, "evaluatedFastReplyTo", Boolean.class)).isTrue(); + assertThat(TestUtils.getPropertyValue(template, "usingFastReplyTo", Boolean.class)).isFalse(); + } + + @Test + public void testEvaluateDirectReplyToOK() throws Exception { + ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class); + Connection mockConnection = mock(Connection.class); + Channel mockChannel = mock(Channel.class); + given(mockChannel.isOpen()).willReturn(true); + + given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection); + given(mockConnection.isOpen()).willReturn(true); + given(mockConnection.createChannel()).willReturn(mockChannel); + given(mockChannel.queueDeclarePassive(Address.AMQ_RABBITMQ_REPLY_TO)) + .willReturn(new AMQImpl.Queue.DeclareOk(Address.AMQ_RABBITMQ_REPLY_TO, 0, 0)); + SingleConnectionFactory connectionFactory = new SingleConnectionFactory(mockConnectionFactory); + connectionFactory.setExecutor(mock(ExecutorService.class)); + RabbitTemplate template = new RabbitTemplate(connectionFactory); + template.setReplyTimeout(1); + template.convertSendAndReceive("foo"); + assertThat(TestUtils.getPropertyValue(template, "evaluatedFastReplyTo", Boolean.class)).isTrue(); + assertThat(TestUtils.getPropertyValue(template, "usingFastReplyTo", Boolean.class)).isTrue(); + } + @Test public void testRecovery() throws Exception { ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);