Skip to content

Commit

Permalink
GH-1038: RT: More evaluateFastReplyTo Fixes
Browse files Browse the repository at this point in the history
Fixes #1038

The previous fix to catch `AmqpConnectException` was incorrect - the
`ShutdownSignalException` for the passive queue declaration failure
is wrapped in an `AmqpConnectException` so we would have failed to
detect the failure.

Also, it has been reported that sometimes an `AmqpIOException` is thrown.

- Add `AmqpIOException` to the catch block
- Search for, and explitly check for the queue declaration failed exception
- For all other cases rethrow so we can test again

**cherry-pick to all supported**

* * Fix log messages
  • Loading branch information
garyrussell authored and artembilan committed Jul 1, 2019
1 parent 24c63e1 commit 5d73342
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.AmqpIllegalStateException;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Address;
Expand Down Expand Up @@ -103,6 +104,7 @@
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

/**
* <p>
Expand Down Expand Up @@ -951,18 +953,25 @@ protected boolean useDirectReplyTo() {
return true;
});
}
catch (AmqpConnectException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Connection error, deferring directReplyTo detection");
catch (AmqpConnectException | AmqpIOException ex) {
Throwable cause = ex;
while (cause != null && !(cause instanceof ShutdownSignalException)) {
cause = cause.getCause();
}
if (cause instanceof ShutdownSignalException) {
if (RabbitUtils.isPassiveDeclarationChannelClose((ShutdownSignalException) cause)) {
if (logger.isWarnEnabled()) {
logger.warn("Broker does not support fast replies via 'amq.rabbitmq.reply-to', temporary "
+ "queues will be used: " + cause.getMessage() + ".");
}
this.replyAddress = null;
return false;
}
}
throw ex;
}
catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.warn("Broker does not support fast replies via 'amq.rabbitmq.reply-to', temporary "
+ "queues will be used:" + e.getMessage() + ".");
logger.debug("IO error, deferring directReplyTo detection: " + ex.toString());
}
this.replyAddress = null;
throw ex;
}
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.springframework.amqp.AmqpAuthenticationException;
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
Expand All @@ -61,6 +62,7 @@
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ChannelProxy;
import org.springframework.amqp.rabbit.connection.PublisherCallbackChannel;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
Expand Down Expand Up @@ -241,6 +243,62 @@ public void testEvaluateDirectReplyToWithConnectException() throws Exception {
assertThat(TestUtils.getPropertyValue(template, "evaluatedFastReplyTo", Boolean.class)).isFalse();
}

@Test
public void testEvaluateDirectReplyToWithIOException() throws Exception {
org.springframework.amqp.rabbit.connection.ConnectionFactory mockConnectionFactory =
mock(org.springframework.amqp.rabbit.connection.ConnectionFactory.class);
willThrow(new AmqpIOException(null)).given(mockConnectionFactory).createConnection();
RabbitTemplate template = new RabbitTemplate(mockConnectionFactory);
assertThatThrownBy(() -> template.convertSendAndReceive("foo")).isInstanceOf(AmqpIOException.class);
assertThat(TestUtils.getPropertyValue(template, "evaluatedFastReplyTo", Boolean.class)).isFalse();
}

@Test
public void testEvaluateDirectReplyToWithIOExceptionDeclareFailed() throws Exception {
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
Channel mockChannel = mock(Channel.class);

given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection);
given(mockConnection.isOpen()).willReturn(true);
given(mockConnection.createChannel()).willReturn(mockChannel);
AMQP.Channel.Close mockMethod = mock(AMQP.Channel.Close.class);
given(mockMethod.getReplyCode()).willReturn(AMQP.NOT_FOUND);
given(mockMethod.getClassId()).willReturn(RabbitUtils.QUEUE_CLASS_ID_50);
given(mockMethod.getMethodId()).willReturn(RabbitUtils.DECLARE_METHOD_ID_10);
willThrow(new ShutdownSignalException(true, false, mockMethod, null)).given(mockChannel)
.queueDeclarePassive(Address.AMQ_RABBITMQ_REPLY_TO);
given(mockChannel.queueDeclare()).willReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
SingleConnectionFactory connectionFactory = new SingleConnectionFactory(mockConnectionFactory);
connectionFactory.setExecutor(mock(ExecutorService.class));
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setReplyTimeout(1);
template.convertSendAndReceive("foo");
assertThat(TestUtils.getPropertyValue(template, "evaluatedFastReplyTo", Boolean.class)).isTrue();
assertThat(TestUtils.getPropertyValue(template, "usingFastReplyTo", Boolean.class)).isFalse();
}

@Test
public void testEvaluateDirectReplyToOK() throws Exception {
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
Channel mockChannel = mock(Channel.class);
given(mockChannel.isOpen()).willReturn(true);

given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection);
given(mockConnection.isOpen()).willReturn(true);
given(mockConnection.createChannel()).willReturn(mockChannel);
given(mockChannel.queueDeclarePassive(Address.AMQ_RABBITMQ_REPLY_TO))
.willReturn(new AMQImpl.Queue.DeclareOk(Address.AMQ_RABBITMQ_REPLY_TO, 0, 0));
SingleConnectionFactory connectionFactory = new SingleConnectionFactory(mockConnectionFactory);
connectionFactory.setExecutor(mock(ExecutorService.class));
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setReplyTimeout(1);
template.convertSendAndReceive("foo");
assertThat(TestUtils.getPropertyValue(template, "evaluatedFastReplyTo", Boolean.class)).isTrue();
assertThat(TestUtils.getPropertyValue(template, "usingFastReplyTo", Boolean.class)).isTrue();
}

@Test
public void testRecovery() throws Exception {
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Expand Down

0 comments on commit 5d73342

Please sign in to comment.