Skip to content

Commit

Permalink
AMQP-796: Fix Admin Transaction
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-796

If an admin uses a transactional `RabbitTemplate` it will start a transaction.
If the connection was opened due to a `RabbitTemplate` operation it should participate
in the same transaction.
Previously, the template used a second channel and treated it as a local transaction.

Also fix the `RabbitAdmin` so it does no work if there is nothing to declare.

# Conflicts:
#	spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitAdmin.java
#	spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminDeclarationTests.java
#	spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java
#	spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java

* Remove `RabbitTemplateTests` changes since they are not related to
the current state of the `RabbitAdmin`: the `RabbitTemplate`-based
constructor has been introduced since version `2.0`
  • Loading branch information
garyrussell authored and artembilan committed Jan 23, 2018
1 parent f78d916 commit f786c5b
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -138,6 +138,18 @@ private static RabbitResourceHolder doGetTransactionalResourceHolder(ConnectionF
channel = ConsumerChannelRegistry.getConsumerChannel(connectionFactory);
if (channel == null && connection == null) {
connection = resourceFactory.createConnection();
if (resourceHolder == null) {
/*
* While creating a connection, a connection listener might have created a
* transactional channel and bound it to the transaction.
*/
resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
.getResource(connectionFactory);
if (resourceHolder != null) {
channel = resourceHolder.getChannel();
resourceHolderToUse = resourceHolder;
}
}
resourceHolderToUse.addConnection(connection);
}
if (channel == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -497,6 +497,10 @@ else if (declarable instanceof Binding) {
}
}

if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
this.logger.debug("Nothing to declare");
return;
}
this.rabbitTemplate.execute(new ChannelCallback<Object>() {
@Override
public Object doInRabbit(Channel channel) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -55,6 +58,7 @@
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionListener;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -115,11 +119,14 @@ public void testNoDeclareWithCachedConnections() throws Exception {
final List<Channel> mockChannels = new ArrayList<Channel>();

doAnswer(new Answer<com.rabbitmq.client.Connection>() {

private int connectionNumber;

@Override
public com.rabbitmq.client.Connection answer(InvocationOnMock invocation) throws Throwable {
com.rabbitmq.client.Connection connection = mock(com.rabbitmq.client.Connection.class);
doAnswer(new Answer<Channel>() {

private int channelNumber;

@Override
Expand Down Expand Up @@ -154,7 +161,7 @@ public Channel answer(InvocationOnMock invocation) throws Throwable {
ccf.createConnection().close();
ccf.destroy();

assertEquals("Admin should not have created a channel", 0, mockChannels.size());
assertEquals("Admin should not have created a channel", 0, mockChannels.size());
}

@Test
Expand Down Expand Up @@ -234,7 +241,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {

verify(channel, never()).queueDeclare(eq("foo"), anyBoolean(), anyBoolean(), anyBoolean(), any(Map.class));
verify(channel, never())
.exchangeDeclare(eq("bar"), eq("direct"), anyBoolean(), anyBoolean(), anyBoolean(), any(Map.class));
.exchangeDeclare(eq("bar"), eq("direct"), anyBoolean(), anyBoolean(), anyBoolean(), any(Map.class));
verify(channel, never()).queueBind(eq("foo"), eq("bar"), eq("foo"), any(Map.class));
}

Expand Down Expand Up @@ -275,7 +282,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {

verify(channel, never()).queueDeclare(eq("foo"), anyBoolean(), anyBoolean(), anyBoolean(), any(Map.class));
verify(channel, never())
.exchangeDeclare(eq("bar"), eq("direct"), anyBoolean(), anyBoolean(), anyBoolean(), any(Map.class));
.exchangeDeclare(eq("bar"), eq("direct"), anyBoolean(), anyBoolean(), anyBoolean(), any(Map.class));
verify(channel, never()).queueBind(eq("foo"), eq("bar"), eq("foo"), any(Map.class));
}

Expand All @@ -293,7 +300,7 @@ public void testJavaConfig() throws Exception {
.queueDeclare(eq("foo"), anyBoolean(), anyBoolean(), anyBoolean(), isNull(Map.class));
verify(Config.channel2, never())
.exchangeDeclare(eq("bar"), eq("direct"), anyBoolean(), anyBoolean(),
anyBoolean(), anyMap());
anyBoolean(), anyMap());
verify(Config.channel2, never()).queueBind(eq("foo"), eq("bar"), eq("foo"), anyMap());
context.close();
}
Expand All @@ -308,7 +315,7 @@ public void testAddRemove() {
assertEquals(2, queue.getDeclaringAdmins().size());
queue.setAdminsThatShouldDeclare(admin1);
assertEquals(1, queue.getDeclaringAdmins().size());
queue.setAdminsThatShouldDeclare(new Object[] {null});
queue.setAdminsThatShouldDeclare(new Object[] { null });
assertEquals(0, queue.getDeclaringAdmins().size());
queue.setAdminsThatShouldDeclare(admin1, admin2);
assertEquals(2, queue.getDeclaringAdmins().size());
Expand All @@ -331,6 +338,26 @@ public void testAddRemove() {
}
}

@Test
public void testNoOpWhenNothingToDeclare() throws Exception {
com.rabbitmq.client.ConnectionFactory cf = mock(com.rabbitmq.client.ConnectionFactory.class);
com.rabbitmq.client.Connection connection = mock(com.rabbitmq.client.Connection.class);
Channel channel = mock(Channel.class, "channel1");
given(channel.isOpen()).willReturn(true);
willReturn(connection).given(cf).newConnection(any(ExecutorService.class), anyString());
given(connection.isOpen()).willReturn(true);
given(connection.createChannel()).willReturn(channel);
CachingConnectionFactory ccf = new CachingConnectionFactory(cf);
ccf.setExecutor(mock(ExecutorService.class));
RabbitTemplate rabbitTemplate = new RabbitTemplate(ccf);
RabbitAdmin admin = new RabbitAdmin(rabbitTemplate.getConnectionFactory());
ApplicationContext ac = mock(ApplicationContext.class);
admin.setApplicationContext(ac);
admin.afterPropertiesSet();
ccf.createConnection();
verify(connection, never()).createChannel();
}

@Configuration
public static class Config {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ public void confirm(CorrelationData correlationData, boolean ack, String cause)
@Override
public void run() {
templateWithConfirmsEnabled.execute(new ChannelCallback<Object>() {

@Override
public Object doInRabbit(Channel channel) throws Exception {
try {
Expand Down Expand Up @@ -325,6 +326,7 @@ public void testPublisherReturns() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final List<Message> returns = new ArrayList<Message>();
templateWithReturnsEnabled.setReturnCallback(new ReturnCallback() {

@Override
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey) {
Expand All @@ -345,6 +347,7 @@ public void testPublisherReturnsWithMandatoryExpression() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final List<Message> returns = new ArrayList<Message>();
templateWithReturnsEnabled.setReturnCallback(new ReturnCallback() {

@Override
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey) {
Expand Down Expand Up @@ -434,6 +437,7 @@ public void confirm(CorrelationData correlationData, boolean ack, String cause)
@Override
public void run() {
template.execute(new ChannelCallback<Object>() {

@Override
public Object doInRabbit(Channel channel) throws Exception {
try {
Expand All @@ -443,9 +447,9 @@ public Object doInRabbit(Channel channel) throws Exception {
Thread.currentThread().interrupt();
}
template.doSend(channel, "", ROUTE,
new SimpleMessageConverter().toMessage("message", new MessageProperties()),
false,
new CorrelationData("def"));
new SimpleMessageConverter().toMessage("message", new MessageProperties()),
false,
new CorrelationData("def"));
threadSentLatch.countDown();
return null;
}
Expand Down Expand Up @@ -488,10 +492,12 @@ public void testPublisherConfirmNotReceivedAged() throws Exception {

final AtomicInteger count = new AtomicInteger();
doAnswer(new Answer<Object>() {

@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
return count.incrementAndGet();
} }).when(mockChannel).getNextPublishSeqNo();
}
}).when(mockChannel).getNextPublishSeqNo();

CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
ccf.setPublisherConfirms(true);
Expand Down Expand Up @@ -533,6 +539,7 @@ public void testPublisherConfirmMultiple() throws Exception {

final AtomicInteger count = new AtomicInteger();
doAnswer(new Answer<Object>() {

@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
return count.incrementAndGet();
Expand Down Expand Up @@ -580,6 +587,7 @@ public void testPublisherConfirmMultipleWithTwoListeners() throws Exception {

final AtomicInteger count = new AtomicInteger();
doAnswer(new Answer<Object>() {

@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
return count.incrementAndGet();
Expand Down Expand Up @@ -681,6 +689,7 @@ public void confirm(CorrelationData correlationData, boolean ack, String cause)
}
});
Executors.newSingleThreadExecutor().execute(new Runnable() {

@Override
public void run() {
template.convertAndSend(ROUTE, (Object) "message", new CorrelationData("abc"));
Expand All @@ -689,6 +698,7 @@ public void run() {
}
});
Executors.newSingleThreadExecutor().execute(new Runnable() {

@Override
public void run() {
try {
Expand Down Expand Up @@ -921,7 +931,8 @@ public void run() {
try {
template.convertAndSend(ROUTE, (Object) "message", new CorrelationData("abc"));
}
catch (AmqpException e) { }
catch (AmqpException e) {
}
}
sentAll.countDown();
}
Expand Down

0 comments on commit f786c5b

Please sign in to comment.