Skip to content

Commit

Permalink
GH-2805 Add a checkAfterCompletion to SimpleMessageListenerContainer.
Browse files Browse the repository at this point in the history
  • Loading branch information
tbadie authored and Thomas Badie committed Sep 2, 2024
1 parent 6841624 commit ee51280
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
* @author Tim Bourquin
* @author Jeonggi Kim
* @author Java4ye
* @author Thomas Badie
*
* @since 1.0
*/
Expand Down Expand Up @@ -1013,6 +1014,9 @@ private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws E
catch (WrappedTransactionException e) { // NOSONAR exception flow control
throw (Exception) e.getCause();
}
finally {
ConnectionFactoryUtils.checkAfterCompletion();
}
}

return doReceiveAndExecute(consumer);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,10 +16,40 @@

package org.springframework.amqp.rabbit.listener;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationEventPublisher;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

/**
* @author Gary Russell
* @author Thomas Badie
* @since 2.0
*
*/
Expand All @@ -32,4 +62,83 @@ protected AbstractMessageListenerContainer createContainer(AbstractConnectionFac
return container;
}


@Test
public void testMessageListenerTxFail() throws Exception {
ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true);
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
final Channel mockChannel = mock(Channel.class);
given(mockChannel.isOpen()).willReturn(true);
given(mockChannel.txSelect()).willReturn(mock(AMQP.Tx.SelectOk.class));
final AtomicReference<CountDownLatch> commitLatch = new AtomicReference<>(new CountDownLatch(1));
String exceptionMessage = "Failed to commit.";
willAnswer(invocation -> {
commitLatch.get().countDown();
throw new IllegalStateException(exceptionMessage);
}).given(mockChannel).txCommit();

final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(mockConnectionFactory);
cachingConnectionFactory.setExecutor(mock(ExecutorService.class));
given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection);
given(mockConnection.isOpen()).willReturn(true);

willAnswer(invocation -> mockChannel).given(mockConnection).createChannel();

final AtomicReference<Consumer> consumer = new AtomicReference<Consumer>();
final CountDownLatch consumerLatch = new CountDownLatch(1);

willAnswer(invocation -> {
consumer.set(invocation.getArgument(6));
consumerLatch.countDown();
return "consumerTag";
}).given(mockChannel)
.basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(),
any(Consumer.class));


final CountDownLatch latch = new CountDownLatch(1);
AbstractMessageListenerContainer container = createContainer(cachingConnectionFactory);
container.setMessageListener(message -> {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
latch.countDown();
});
container.setQueueNames("queue");
container.setChannelTransacted(true);
container.setShutdownTimeout(100);
DummyTxManager transactionManager = new DummyTxManager();
container.setTransactionManager(transactionManager);
ApplicationEventPublisher applicationEventPublisher = mock(ApplicationEventPublisher.class);
container.setApplicationEventPublisher(applicationEventPublisher);
container.afterPropertiesSet();
container.start();
assertThat(consumerLatch.await(10, TimeUnit.SECONDS)).isTrue();

consumer.get().handleDelivery("qux",
new Envelope(1, false, "foo", "bar"), new AMQP.BasicProperties(),
new byte[] { 0 });

assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();

verify(mockConnection, times(1)).createChannel();
assertThat(commitLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
verify(mockChannel).basicAck(anyLong(), anyBoolean());
verify(mockChannel).txCommit();

verify(applicationEventPublisher).publishEvent(any(ListenerContainerConsumerFailedEvent.class));

ArgumentCaptor<ListenerContainerConsumerFailedEvent> argumentCaptor
= ArgumentCaptor.forClass(ListenerContainerConsumerFailedEvent.class);
verify(applicationEventPublisher).publishEvent(argumentCaptor.capture());
assertThat(argumentCaptor.getValue().getThrowable()).hasCauseInstanceOf(IllegalStateException.class);
assertThat(argumentCaptor.getValue().getThrowable())
.isNotNull().extracting(Throwable::getCause)
.isNotNull().extracting(Throwable::getMessage).isEqualTo(exceptionMessage);
container.stop();
}


}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -69,6 +69,7 @@

/**
* @author Gary Russell
* @author Thomas Badie
* @since 1.1.2
*
*/
Expand Down Expand Up @@ -758,7 +759,7 @@ public void testMessageListenerWithRabbitTxManager() throws Exception {
container.stop();
}

private Answer<Channel> ensureOneChannelAnswer(final Channel onlyChannel,
protected Answer<Channel> ensureOneChannelAnswer(final Channel onlyChannel,
final AtomicReference<Exception> tooManyChannels) {
final AtomicBoolean done = new AtomicBoolean();
return invocation -> {
Expand All @@ -776,7 +777,7 @@ private Answer<Channel> ensureOneChannelAnswer(final Channel onlyChannel,
protected abstract AbstractMessageListenerContainer createContainer(AbstractConnectionFactory connectionFactory);

@SuppressWarnings("serial")
private static class DummyTxManager extends AbstractPlatformTransactionManager {
protected static class DummyTxManager extends AbstractPlatformTransactionManager {

private volatile boolean committed;

Expand Down Expand Up @@ -804,6 +805,30 @@ protected void doRollback(DefaultTransactionStatus status) throws TransactionExc
this.rolledBack = true;
this.latch.countDown();
}

public boolean isCommitted() {
return committed;
}

public void setCommitted(boolean committed) {
this.committed = committed;
}

public boolean isRolledBack() {
return rolledBack;
}

public void setRolledBack(boolean rolledBack) {
this.rolledBack = rolledBack;
}

public CountDownLatch getLatch() {
return latch;
}

public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
}

}

0 comments on commit ee51280

Please sign in to comment.