Skip to content

Commit

Permalink
AMQP-788 Add delegate publisher connection factory
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-788

To avoid deadlocks, it is best to use a different connection for producers and
consumers (unless the producer is partiticipating in a consumer transaction).

- Add a delegate `publisherConnectionFactory` to the `CachingConnecetionFactory`.
- Use the same underlying `com.rabbitmq.client.ConnectionFactory` in each.
- Propagate all properties to the delegate.
 - Except, enhance the connection name and bean name with `.publisher`.
- Add a boolean `usePublisherConnection` to the `RabbitTemplate`.
 - If true, use `createPublisherConnection()` when appropriate.

Polishing - PR Comments

Polishing - More PR Comments and Fix doSendAndReceiveWithDirect() to use the publishing CF.

More polishing; tests + reinstate overloaded execute().

Fix (old) race condition in testReceiveAndReplyNonBlocking.

https://travis-ci.org/spring-projects/spring-amqp/builds/317052188?utm_source=github_status&utm_medium=notification

Polishing

Docs
  • Loading branch information
garyrussell authored and artembilan committed Dec 18, 2017
1 parent c576b27 commit 4b78c20
Show file tree
Hide file tree
Showing 11 changed files with 410 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
public abstract class AbstractConnectionFactory implements ConnectionFactory, DisposableBean, BeanNameAware,
ApplicationContextAware, ApplicationEventPublisherAware, ApplicationListener<ContextClosedEvent> {

private static final String PUBLISHER_SUFFIX = ".publisher";

public static final int DEFAULT_CLOSE_TIMEOUT = 30000;

private static final String BAD_URI = "setUri() was passed an invalid URI; it is ignored";
Expand All @@ -77,6 +79,8 @@ public abstract class AbstractConnectionFactory implements ConnectionFactory, Di

private final AtomicInteger defaultConnectionNameStrategyCounter = new AtomicInteger();

private AbstractConnectionFactory publisherConnectionFactory;

private RecoveryListener recoveryListener = new RecoveryListener() {

@Override
Expand Down Expand Up @@ -115,17 +119,26 @@ public void handleRecovery(Recoverable recoverable) {
private volatile boolean contextStopped;

/**
* Create a new AbstractConnectionFactory for the given target ConnectionFactory.
* Create a new AbstractConnectionFactory for the given target ConnectionFactory,
* with no publisher connection factory.
* @param rabbitConnectionFactory the target ConnectionFactory
*/
public AbstractConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory) {
Assert.notNull(rabbitConnectionFactory, "Target ConnectionFactory must not be null");
this.rabbitConnectionFactory = rabbitConnectionFactory;
}

protected final void setPublisherConnectionFactory(
AbstractConnectionFactory publisherConnectionFactory) {
this.publisherConnectionFactory = publisherConnectionFactory;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setApplicationContext(applicationContext);
}
}

protected ApplicationContext getApplicationContext() {
Expand All @@ -135,6 +148,9 @@ protected ApplicationContext getApplicationContext() {
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setApplicationEventPublisher(applicationEventPublisher);
}
}

protected ApplicationEventPublisher getApplicationEventPublisher() {
Expand All @@ -146,6 +162,9 @@ public void onApplicationEvent(ContextClosedEvent event) {
if (getApplicationContext() == event.getApplicationContext()) {
this.contextStopped = true;
}
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.onApplicationEvent(event);
}
}

protected boolean getContextStopped() {
Expand Down Expand Up @@ -261,6 +280,9 @@ public void setAddresses(String addresses) {
Address[] addressArray = Address.parseAddresses(addresses);
if (addressArray.length > 0) {
this.addresses = addressArray;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setAddresses(addresses);
}
return;
}
}
Expand Down Expand Up @@ -288,21 +310,34 @@ protected ChannelListener getChannelListener() {

public void setConnectionListeners(List<? extends ConnectionListener> listeners) {
this.connectionListener.setDelegates(listeners);
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setConnectionListeners(listeners);
}
}

@Override
public void addConnectionListener(ConnectionListener listener) {
this.connectionListener.addDelegate(listener);
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.addConnectionListener(listener);
}
}

@Override
public boolean removeConnectionListener(ConnectionListener listener) {
return this.connectionListener.removeDelegate(listener);
boolean result = this.connectionListener.removeDelegate(listener);
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.removeConnectionListener(listener); // NOSONAR
}
return result;
}

@Override
public void clearConnectionListeners() {
this.connectionListener.clearDelegates();
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.clearConnectionListeners();
}
}

public void setChannelListeners(List<? extends ChannelListener> listeners) {
Expand All @@ -316,10 +351,16 @@ public void setChannelListeners(List<? extends ChannelListener> listeners) {
*/
public void setRecoveryListener(RecoveryListener recoveryListener) {
this.recoveryListener = recoveryListener;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setRecoveryListener(recoveryListener);
}
}

public void addChannelListener(ChannelListener listener) {
this.channelListener.addDelegate(listener);
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.addChannelListener(listener);
}
}

/**
Expand All @@ -340,6 +381,9 @@ public void setExecutor(Executor executor) {
else {
this.executorService = ((ThreadPoolTaskExecutor) executor).getThreadPoolExecutor();
}
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setExecutor(executor);
}
}

protected ExecutorService getExecutorService() {
Expand All @@ -353,6 +397,9 @@ protected ExecutorService getExecutorService() {
*/
public void setCloseTimeout(int closeTimeout) {
this.closeTimeout = closeTimeout;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setCloseTimeout(closeTimeout);
}
}

public int getCloseTimeout() {
Expand All @@ -367,11 +414,27 @@ public int getCloseTimeout() {
*/
public void setConnectionNameStrategy(ConnectionNameStrategy connectionNameStrategy) {
this.connectionNameStrategy = connectionNameStrategy;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setConnectionNameStrategy(
cf -> connectionNameStrategy.obtainNewConnectionName(cf) + PUBLISHER_SUFFIX);
}
}

@Override
public void setBeanName(String name) {
this.beanName = name;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setBeanName(name + PUBLISHER_SUFFIX);
}
}

public boolean hasPublisherConnectionFactory() {
return this.publisherConnectionFactory != null;
}

@Override
public ConnectionFactory getPublisherConnectionFactory() {
return this.publisherConnectionFactory;
}

protected final Connection createBareConnection() {
Expand Down Expand Up @@ -430,6 +493,9 @@ protected final String getDefaultHostName() {

@Override
public void destroy() {
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.destroy();
}
}

@Override
Expand Down
Loading

0 comments on commit 4b78c20

Please sign in to comment.