Skip to content

Commit

Permalink
AMQP-796: Fix Admin Transaction
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-796

If an admin uses a transactional `RabbitTemplate` it will start a transaction.
If the connection was opened due to a `RabbitTemplate` operation it should participate
in the same transaction.
Previously, the template used a second channel and treated it as a local transaction.

Also fix the `RabbitAdmin` so it does no work if there is nothing to declare.
  • Loading branch information
garyrussell authored and artembilan committed Jan 23, 2018
1 parent 6761ab5 commit af7ee3f
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -155,6 +155,18 @@ private static RabbitResourceHolder doGetTransactionalResourceHolder(ConnectionF
channel = ConsumerChannelRegistry.getConsumerChannel(connectionFactory);
if (channel == null && connection == null) {
connection = resourceFactory.createConnection();
if (resourceHolder == null) {
/*
* While creating a connection, a connection listener might have created a
* transactional channel and bound it to the transaction.
*/
resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
.getResource(connectionFactory);
if (resourceHolder != null) {
channel = resourceHolder.getChannel();
resourceHolderToUse = resourceHolder;
}
}
resourceHolderToUse.addConnection(connection);
}
if (channel == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -475,6 +475,10 @@ else if (declarable instanceof Binding) {
}
}

if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
this.logger.debug("Nothing to declare");
return;
}
this.rabbitTemplate.execute(channel -> {
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
declareQueues(channel, queues.toArray(new Queue[queues.size()]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -55,6 +58,7 @@
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionListener;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -313,6 +317,26 @@ public void testAddRemove() {
}
}

@Test
public void testNoOpWhenNothingToDeclare() throws Exception {
com.rabbitmq.client.ConnectionFactory cf = mock(com.rabbitmq.client.ConnectionFactory.class);
com.rabbitmq.client.Connection connection = mock(com.rabbitmq.client.Connection.class);
Channel channel = mock(Channel.class, "channel1");
given(channel.isOpen()).willReturn(true);
willReturn(connection).given(cf).newConnection(any(ExecutorService.class), anyString());
given(connection.isOpen()).willReturn(true);
given(connection.createChannel()).willReturn(channel);
CachingConnectionFactory ccf = new CachingConnectionFactory(cf);
ccf.setExecutor(mock(ExecutorService.class));
RabbitTemplate rabbitTemplate = new RabbitTemplate(ccf);
RabbitAdmin admin = new RabbitAdmin(rabbitTemplate);
ApplicationContext ac = mock(ApplicationContext.class);
admin.setApplicationContext(ac);
admin.afterPropertiesSet();
ccf.createConnection();
verify(connection, never()).createChannel();
}

@Configuration
public static class Config {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.amqp.rabbit.core;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
Expand All @@ -25,13 +26,16 @@
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand All @@ -48,15 +52,18 @@
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.ReceiveAndReplyCallback;
import org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ChannelProxy;
import org.springframework.amqp.rabbit.connection.PublisherCallbackChannelConnectionFactory;
import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.support.PublisherCallbackChannel;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.retry.support.RetryTemplate;
Expand All @@ -74,6 +81,7 @@
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.impl.AMQImpl;
import com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk;

/**
* @author Gary Russell
Expand Down Expand Up @@ -327,4 +335,65 @@ public void testRoutingConnectionFactory() throws Exception {
Mockito.verify(connectionFactory2, Mockito.times(4)).createConnection();
}

@Test
public void testNestedTxBinding() throws Exception {
ConnectionFactory cf = mock(ConnectionFactory.class);
Connection connection = mock(Connection.class);
Channel channel1 = mock(Channel.class, "channel1");
given(channel1.isOpen()).willReturn(true);
Channel channel2 = mock(Channel.class, "channel2");
given(channel2.isOpen()).willReturn(true);
willReturn(connection).given(cf).newConnection(any(ExecutorService.class), anyString());
given(connection.isOpen()).willReturn(true);
given(connection.createChannel()).willReturn(channel1, channel2);
DeclareOk dok = new DeclareOk("foo", 0, 0);
willReturn(dok).given(channel1).queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), isNull());
CachingConnectionFactory ccf = new CachingConnectionFactory(cf);
ccf.setExecutor(mock(ExecutorService.class));
RabbitTemplate rabbitTemplate = new RabbitTemplate(ccf);
rabbitTemplate.setChannelTransacted(true);
RabbitAdmin admin = new RabbitAdmin(rabbitTemplate);
ApplicationContext ac = mock(ApplicationContext.class);
willReturn(Collections.singletonMap("foo", new Queue("foo"))).given(ac).getBeansOfType(Queue.class);
admin.setApplicationContext(ac);
admin.afterPropertiesSet();
AtomicReference<Channel> templateChannel = new AtomicReference<>();
new TransactionTemplate(new TestTransactionManager()).execute(s -> {
return rabbitTemplate.execute(c -> {
templateChannel.set(c);
return true;
});
});
verify(channel1).txSelect();
verify(channel1).queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), isNull());
assertThat(((ChannelProxy) templateChannel.get()).getTargetChannel(), equalTo(channel1));
verify(channel1).txCommit();
}

@SuppressWarnings("serial")
private class TestTransactionManager extends AbstractPlatformTransactionManager {

TestTransactionManager() {
super();
}

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
}

@Override
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
}

@Override
protected Object doGetTransaction() throws TransactionException {
return new Object();
}

@Override
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
}

}

}

0 comments on commit af7ee3f

Please sign in to comment.