Skip to content

Commit

Permalink
AMQP-793: Exception on ack for closed Channel
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-793

Throw an exception to the caller when attempting to ack/nack a
message on a closed channel.
  • Loading branch information
garyrussell authored and artembilan committed Dec 19, 2017
1 parent 2626930 commit 221e3a9
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ public class CachingConnectionFactory extends AbstractConnectionFactory

private static final int DEFAULT_CHANNEL_CACHE_SIZE = 25;

private static final Set<String> txStarts = new HashSet<>(Arrays.asList("basicPublish", "basicAck", "basicNack",
"basicReject"));
private static final Set<String> txStarts = new HashSet<>(Arrays.asList("basicPublish", "basicAck",
"basicNack", "basicReject"));

private static final Set<String> ackMethods = new HashSet<>(Arrays.asList("basicAck",
"basicNack", "basicReject"));

private static final Set<String> txEnds = new HashSet<>(Arrays.asList("txCommit", "txRollback"));

Expand Down Expand Up @@ -957,11 +960,17 @@ else if (methodName.equals("isTransactional")) {
if (this.target == null || !this.target.isOpen()) {
if (this.target instanceof PublisherCallbackChannel) {
this.target.close();
throw new InvocationTargetException(new AmqpException("PublisherCallbackChannel is closed"));
throw new InvocationTargetException(
new AmqpException("PublisherCallbackChannel is closed"));
}
else if (this.txStarted) {
this.txStarted = false;
throw new IllegalStateException("Channel closed during transaction");
throw new InvocationTargetException(
new IllegalStateException("Channel closed during transaction"));
}
else if (ackMethods.contains(methodName)) {
throw new InvocationTargetException(
new IllegalStateException("Channel closed; cannot ack/nack"));
}
this.target = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.mockito.ArgumentCaptor;

import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
Expand All @@ -64,6 +66,7 @@
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.BrokerRunning;
Expand Down Expand Up @@ -610,6 +613,32 @@ public void testErrorStopsContainer() throws Exception {
assertFalse(this.container.isRunning());
}

@Test
public void testManualAckWithClosedChannel() throws Exception {
final AtomicReference<IllegalStateException> exc = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
this.container = createContainer((ChannelAwareMessageListener) (m, c) -> {
if (exc.get() == null) {
((CachingConnectionFactory) this.template.getConnectionFactory()).resetConnection();
}
try {
c.basicAck(m.getMessageProperties().getDeliveryTag(), false);
}
catch (IllegalStateException e) {
exc.set(e);
}
latch.countDown();
}, false, this.queue.getName());
this.container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
this.container.afterPropertiesSet();
this.container.start();
this.template.convertAndSend(this.queue.getName(), "foo");
assertTrue(latch.await(10, TimeUnit.SECONDS));
this.container.stop();
assertNotNull(exc.get());
assertThat(exc.get().getMessage(), equalTo("Channel closed; cannot ack/nack"));
}

private boolean containerStoppedForAbortWithBadListener() throws InterruptedException {
Log logger = spy(TestUtils.getPropertyValue(container, "logger", Log.class));
new DirectFieldAccessor(container).setPropertyValue("logger", logger);
Expand Down

0 comments on commit 221e3a9

Please sign in to comment.