Skip to content

Commit

Permalink
spring-projectsGH-1198: Support AddressResolver
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed May 15, 2020
1 parent da413d6 commit 62f465e
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class ConnectionFactoryParser extends AbstractSingleBeanDefinitionParser {

private static final String SHUFFLE_ADDRESSES = "shuffle-addresses";

private static final String ADDRESS_RESOLVER = "address-resolver";

private static final String VIRTUAL_HOST_ATTRIBUTE = "virtual-host";

private static final String USER_ATTRIBUTE = "username";
Expand Down Expand Up @@ -101,6 +103,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
NamespaceUtils.setReferenceIfAttributeDefined(builder, element, EXECUTOR_ATTRIBUTE);
NamespaceUtils.setValueIfAttributeDefined(builder, element, ADDRESSES);
NamespaceUtils.setValueIfAttributeDefined(builder, element, SHUFFLE_ADDRESSES);
NamespaceUtils.setReferenceIfAttributeDefined(builder, element, ADDRESS_RESOLVER);
NamespaceUtils.setValueIfAttributeDefined(builder, element, PUBLISHER_RETURNS);
NamespaceUtils.setValueIfAttributeDefined(builder, element, REQUESTED_HEARTBEAT, "requestedHeartBeat");
NamespaceUtils.setValueIfAttributeDefined(builder, element, CONNECTION_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,6 +51,7 @@
import org.springframework.util.StringUtils;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.AddressResolver;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
Expand Down Expand Up @@ -122,6 +123,8 @@ public void handleRecovery(Recoverable recoverable) {

private ApplicationEventPublisher applicationEventPublisher;

private AddressResolver addressResolver;

private volatile boolean contextStopped;

/**
Expand Down Expand Up @@ -217,6 +220,16 @@ public void setConnectionThreadFactory(ThreadFactory threadFactory) {
this.rabbitConnectionFactory.setThreadFactory(threadFactory);
}

/**
* Set an {@link AddressResolver} to use when creating connections; overrides
* {@link #setAddresses(String)}, {@link #setHost(String)}, and {@link #setPort(int)}.
* @param addressResolver the resolver.
* @since 2.1.15
*/
public void setAddressResolver(AddressResolver addressResolver) {
this.addressResolver = addressResolver;
}

/**
* @param uri the URI
* @since 1.5
Expand Down Expand Up @@ -292,7 +305,8 @@ public void setAddresses(String addresses) {
return;
}
}
this.logger.info("setAddresses() called with an empty value, will be using the host+port properties for connections");
this.logger.info("setAddresses() called with an empty value, will be using the host+port "
+ " or addressResolver properties for connections");
this.addresses = null;
}

Expand Down Expand Up @@ -512,27 +526,52 @@ public void handleRecovery(Recoverable recoverable) {
}

private com.rabbitmq.client.Connection connect(String connectionName) throws IOException, TimeoutException {
com.rabbitmq.client.Connection rabbitConnection;
if (this.addressResolver != null) {
return connectResolver(connectionName);
}
if (this.addresses != null) {
List<Address> addressesToConnect = this.addresses;
if (this.shuffleAddresses && addressesToConnect.size() > 1) {
List<Address> list = new ArrayList<>(addressesToConnect);
Collections.shuffle(list);
addressesToConnect = list;
}
if (this.logger.isInfoEnabled()) {
this.logger.info("Attempting to connect to: " + addressesToConnect);
}
rabbitConnection = this.rabbitConnectionFactory.newConnection(this.executorService, addressesToConnect,
connectionName);
return connectAddresses(connectionName);
}
else {
if (this.logger.isInfoEnabled()) {
this.logger.info("Attempting to connect to: " + this.rabbitConnectionFactory.getHost()
+ ":" + this.rabbitConnectionFactory.getPort());
}
rabbitConnection = this.rabbitConnectionFactory.newConnection(this.executorService, connectionName);
return connectHostPort(connectionName);
}
}

private com.rabbitmq.client.Connection connectResolver(String connectionName) throws IOException, TimeoutException {
com.rabbitmq.client.Connection rabbitConnection;
if (this.logger.isInfoEnabled()) {
this.logger.info("Attempting to connect with: " + this.addressResolver);
}
rabbitConnection = this.rabbitConnectionFactory.newConnection(this.executorService, this.addressResolver,
connectionName);
return rabbitConnection;
}

private com.rabbitmq.client.Connection connectAddresses(String connectionName)
throws IOException, TimeoutException {

com.rabbitmq.client.Connection rabbitConnection;
List<Address> addressesToConnect = this.addresses;
if (this.shuffleAddresses && addressesToConnect.size() > 1) {
List<Address> list = new ArrayList<>(addressesToConnect);
Collections.shuffle(list);
addressesToConnect = list;
}
if (this.logger.isInfoEnabled()) {
this.logger.info("Attempting to connect to: " + addressesToConnect);
}
rabbitConnection = this.rabbitConnectionFactory.newConnection(this.executorService, addressesToConnect,
connectionName);
return rabbitConnection;
}

private com.rabbitmq.client.Connection connectHostPort(String connectionName) throws IOException, TimeoutException {
com.rabbitmq.client.Connection rabbitConnection;
if (this.logger.isInfoEnabled()) {
this.logger.info("Attempting to connect to: " + this.rabbitConnectionFactory.getHost()
+ ":" + this.rabbitConnectionFactory.getPort());
}
rabbitConnection = this.rabbitConnectionFactory.newConnection(this.executorService, connectionName);
return rabbitConnection;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,18 @@
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="address-resolver" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
An address resolver bean; overrides 'addresses' and 'host/port'.
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="com.rabbitmq.client.AddressResolver" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="username" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,4 +128,10 @@ public void testMultiHost() throws Exception {
"rabbitConnectionFactory.threadFactory")).isSameAs(beanFactory.getBean("tf"));
}

@Test
void testResolver() {
CachingConnectionFactory connectionFactory = beanFactory.getBean("resolved", CachingConnectionFactory.class);
assertThat(TestUtils.getPropertyValue(connectionFactory, "addressResolver"))
.isSameAs(this.beanFactory.getBean("resolver"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.springframework.test.util.ReflectionTestUtils;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.AddressResolver;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.ConnectionFactory;
Expand Down Expand Up @@ -1846,4 +1847,27 @@ public void testShuffle() throws IOException, TimeoutException {
assertThat(firstAddress).containsExactly("host1", "host2", "host3");
}

@Test
void testResolver() throws Exception {
com.rabbitmq.client.ConnectionFactory mockConnectionFactory = mock(com.rabbitmq.client.ConnectionFactory.class);
com.rabbitmq.client.Connection mockConnection = mock(com.rabbitmq.client.Connection.class);
Channel mockChannel = mock(Channel.class);

AddressResolver resolver = () -> Collections.singletonList(Address.parseAddress("foo:5672"));
when(mockConnectionFactory.newConnection(any(ExecutorService.class), eq(resolver), anyString()))
.thenReturn(mockConnection);
when(mockConnection.createChannel()).thenReturn(mockChannel);
when(mockChannel.isOpen()).thenReturn(true);
when(mockConnection.isOpen()).thenReturn(true);

CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
ccf.setExecutor(mock(ExecutorService.class));
ccf.setAddressResolver(resolver);
Connection con = ccf.createConnection();
assertThat(con).isNotNull();
assertThat(TestUtils.getPropertyValue(con, "target", SimpleConnection.class).getDelegate())
.isEqualTo(mockConnection);
verify(mockConnectionFactory).newConnection(any(ExecutorService.class), eq(resolver), anyString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@

<rabbit:connection-factory id="native" connection-factory="connectionFactory" channel-cache-size="10" />


<rabbit:connection-factory id="resolved" connection-factory="connectionFactory"
address-resolver="resolver"/>

<bean id="resolver" class="com.rabbitmq.client.ListAddressResolver">
<constructor-arg value="null"/>
</bean>

<bean id="connectionFactory" class="com.rabbitmq.client.ConnectionFactory"/>

<rabbit:connection-factory id="withExecutor" host="foo" virtual-host="/bar"
Expand Down
5 changes: 5 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,11 @@ The following example with a custom thread factory that prefixes thread names wi
----
====

===== AddressResolver

Starting with version 2.1.15, you can now use an `AddressResover` to resolve the connection address(es).
This will override any settings of the `addresses` and `host/port` properties.

===== Naming Connections

Starting with version 1.7, a `ConnectionNameStrategy` is provided for the injection into the `AbstractionConnectionFactory`.
Expand Down

0 comments on commit 62f465e

Please sign in to comment.