Skip to content

Commit

Permalink
GH-1038: RT: Fix evaluatedFastReplyTo
Browse files Browse the repository at this point in the history
Fixes #1038

Don't set `evaluatedFastReplyTo` if we didn't actually evaluate it because
the broker is down on the first request.

**cherry-pick to all 2.x; backport to 1.7.x**

# Conflicts:
#	spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java

# Conflicts:
#	spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java

# Conflicts:
#	spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java
#	spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java
  • Loading branch information
garyrussell authored and artembilan committed Jun 28, 2019
1 parent fee8a97 commit beb208c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIllegalStateException;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
Expand Down Expand Up @@ -665,15 +666,17 @@ protected boolean useDirectReplyTo() {
}
if (this.replyAddress == null || Address.AMQ_RABBITMQ_REPLY_TO.equals(this.replyAddress)) {
try {
execute(new ChannelCallback<Void>() {
return execute(new ChannelCallback<Boolean>() {

@Override
public Void doInRabbit(Channel channel) throws Exception {
public Boolean doInRabbit(Channel channel) throws Exception {
channel.queueDeclarePassive(Address.AMQ_RABBITMQ_REPLY_TO);
return null;
return true;
}
});
return true;
}
catch (AmqpConnectException ex) {
throw ex;
}
catch (Exception e) {
if (this.replyAddress != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package org.springframework.amqp.rabbit.core;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
Expand All @@ -44,6 +47,7 @@
import org.mockito.stubbing.Answer;

import org.springframework.amqp.AmqpAuthenticationException;
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.ReceiveAndReplyCallback;
Expand Down Expand Up @@ -123,13 +127,15 @@ protected void doRollback(DefaultTransactionStatus status) throws TransactionExc
template.setChannelTransacted(true);

txTemplate.execute(new TransactionCallback<Object>() {

@Override
public Object doInTransaction(TransactionStatus status) {
template.convertAndSend("foo", "bar");
return null;
}
});
txTemplate.execute(new TransactionCallback<Object>() {

@Override
public Object doInTransaction(TransactionStatus status) {
template.convertAndSend("baz", "qux");
Expand Down Expand Up @@ -188,6 +194,7 @@ public void dontHangConsumerThread() throws Exception {

final AtomicReference<Consumer> consumer = new AtomicReference<Consumer>();
doAnswer(new Answer<Object>() {

@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
consumer.set((Consumer) invocation.getArguments()[6]);
Expand Down Expand Up @@ -224,6 +231,23 @@ public void testRetry() throws Exception {
assertEquals(3, count.get());
}

@Test
public void testEvaluateDirectReplyToWithConnectException() {
org.springframework.amqp.rabbit.connection.ConnectionFactory mockConnectionFactory =
mock(org.springframework.amqp.rabbit.connection.ConnectionFactory.class);
willThrow(new AmqpConnectException(null)).given(mockConnectionFactory).createConnection();
RabbitTemplate template = new RabbitTemplate(mockConnectionFactory);

try {
template.convertSendAndReceive("foo");
}
catch (Exception ex) {
assertThat(ex, instanceOf(AmqpConnectException.class));
}

assertFalse(TestUtils.getPropertyValue(template, "evaluatedFastReplyTo", Boolean.class));
}

@Test
public void testRecovery() throws Exception {
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Expand Down

0 comments on commit beb208c

Please sign in to comment.