Skip to content

Commit

Permalink
Return transport addresses from UnicastHostsProvider
Browse files Browse the repository at this point in the history
Relates to elastic#20695
  • Loading branch information
ywelsch committed Jun 19, 2018
1 parent 02a4ef3 commit e647fa5
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@
import com.microsoft.windowsazure.management.compute.models.HostedServiceGetDetailedResponse;
import com.microsoft.windowsazure.management.compute.models.InstanceEndpoint;
import com.microsoft.windowsazure.management.compute.models.RoleInstance;
import org.elasticsearch.Version;
import org.elasticsearch.cloud.azure.classic.AzureServiceDisableException;
import org.elasticsearch.cloud.azure.classic.AzureServiceRemoteException;
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService;
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService.Discovery;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.network.InetAddresses;
Expand All @@ -47,9 +45,6 @@
import java.util.ArrayList;
import java.util.List;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;

public class AzureUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {

public enum HostType {
Expand Down Expand Up @@ -104,7 +99,7 @@ public static Deployment fromString(String string) {

private final TimeValue refreshInterval;
private long lastRefresh;
private List<DiscoveryNode> cachedDiscoNodes;
private List<TransportAddress> dynamicHosts;
private final HostType hostType;
private final String publicEndpointName;
private final String deploymentName;
Expand Down Expand Up @@ -137,30 +132,30 @@ public AzureUnicastHostsProvider(Settings settings, AzureComputeService azureCom
* Setting `cloud.azure.refresh_interval` to `0` will disable caching (default).
*/
@Override
public List<DiscoveryNode> buildDynamicNodes() {
public List<TransportAddress> buildDynamicHosts() {
if (refreshInterval.millis() != 0) {
if (cachedDiscoNodes != null &&
if (dynamicHosts != null &&
(refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) {
logger.trace("using cache to retrieve node list");
return cachedDiscoNodes;
return dynamicHosts;
}
lastRefresh = System.currentTimeMillis();
}
logger.debug("start building nodes list using Azure API");

cachedDiscoNodes = new ArrayList<>();
dynamicHosts = new ArrayList<>();

HostedServiceGetDetailedResponse detailed;
try {
detailed = azureComputeService.getServiceDetails();
} catch (AzureServiceDisableException e) {
logger.debug("Azure discovery service has been disabled. Returning empty list of nodes.");
return cachedDiscoNodes;
return dynamicHosts;
} catch (AzureServiceRemoteException e) {
// We got a remote exception
logger.warn("can not get list of azure nodes: [{}]. Returning empty list of nodes.", e.getMessage());
logger.trace("AzureServiceRemoteException caught", e);
return cachedDiscoNodes;
return dynamicHosts;
}

InetAddress ipAddress = null;
Expand Down Expand Up @@ -212,18 +207,17 @@ public List<DiscoveryNode> buildDynamicNodes() {
TransportAddress[] addresses = transportService.addressesFromString(networkAddress, 1);
for (TransportAddress address : addresses) {
logger.trace("adding {}, transport_address {}", networkAddress, address);
cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceName(), address, emptyMap(),
emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
dynamicHosts.add(address);
}
} catch (Exception e) {
logger.warn("can not convert [{}] to transport address. skipping. [{}]", networkAddress, e.getMessage());
}
}
}

logger.debug("{} node(s) added", cachedDiscoNodes.size());
logger.debug("{} addresses added", dynamicHosts.size());

return cachedDiscoNodes;
return dynamicHosts;
}

protected String resolveInstanceAddress(final HostType hostType, final RoleInstance instance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import com.amazonaws.services.ec2.model.Tag;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
Expand All @@ -46,8 +44,6 @@
import java.util.Set;

import static java.util.Collections.disjoint;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.TAG_PREFIX;
import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.PRIVATE_DNS;
import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.PRIVATE_IP;
Expand All @@ -70,15 +66,15 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos

private final String hostType;

private final DiscoNodesCache discoNodes;
private final TransportAddressesCache dynamicHosts;

AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AwsEc2Service awsEc2Service) {
super(settings);
this.transportService = transportService;
this.awsEc2Service = awsEc2Service;

this.hostType = AwsEc2Service.HOST_TYPE_SETTING.get(settings);
this.discoNodes = new DiscoNodesCache(AwsEc2Service.NODE_CACHE_TIME_SETTING.get(settings));
this.dynamicHosts = new TransportAddressesCache(AwsEc2Service.NODE_CACHE_TIME_SETTING.get(settings));

this.bindAnyGroup = AwsEc2Service.ANY_GROUP_SETTING.get(settings);
this.groups = new HashSet<>();
Expand All @@ -96,13 +92,13 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
}

@Override
public List<DiscoveryNode> buildDynamicNodes() {
return discoNodes.getOrRefresh();
public List<TransportAddress> buildDynamicHosts() {
return dynamicHosts.getOrRefresh();
}

protected List<DiscoveryNode> fetchDynamicNodes() {
protected List<TransportAddress> fetchDynamicNodes() {

final List<DiscoveryNode> discoNodes = new ArrayList<>();
final List<TransportAddress> dynamicHosts = new ArrayList<>();

final DescribeInstancesResult descInstances;
try (AmazonEc2Reference clientReference = awsEc2Service.client()) {
Expand All @@ -115,7 +111,7 @@ protected List<DiscoveryNode> fetchDynamicNodes() {
} catch (final AmazonClientException e) {
logger.info("Exception while retrieving instance list from AWS API: {}", e.getMessage());
logger.debug("Full exception:", e);
return discoNodes;
return dynamicHosts;
}

logger.trace("building dynamic unicast discovery nodes...");
Expand Down Expand Up @@ -179,8 +175,7 @@ && disjoint(securityGroupIds, groups)) {
final TransportAddress[] addresses = transportService.addressesFromString(address, 1);
for (int i = 0; i < addresses.length; i++) {
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
discoNodes.add(new DiscoveryNode(instance.getInstanceId(), "#cloud-" + instance.getInstanceId() + "-" + i,
addresses[i], emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
dynamicHosts.add(addresses[i]);
}
} catch (final Exception e) {
final String finalAddress = address;
Expand All @@ -194,9 +189,9 @@ && disjoint(securityGroupIds, groups)) {
}
}

logger.debug("using dynamic discovery nodes {}", discoNodes);
logger.debug("using dynamic transport addresses {}", dynamicHosts);

return discoNodes;
return dynamicHosts;
}

private DescribeInstancesRequest buildDescribeInstancesRequest() {
Expand All @@ -222,11 +217,11 @@ private DescribeInstancesRequest buildDescribeInstancesRequest() {
return describeInstancesRequest;
}

private final class DiscoNodesCache extends SingleObjectCache<List<DiscoveryNode>> {
private final class TransportAddressesCache extends SingleObjectCache<List<TransportAddress>> {

private boolean empty = true;

protected DiscoNodesCache(TimeValue refreshInterval) {
protected TransportAddressesCache(TimeValue refreshInterval) {
super(refreshInterval, new ArrayList<>());
}

Expand All @@ -236,8 +231,8 @@ protected boolean needsRefresh() {
}

@Override
protected List<DiscoveryNode> refresh() {
final List<DiscoveryNode> nodes = fetchDynamicNodes();
protected List<TransportAddress> refresh() {
final List<TransportAddress> nodes = fetchDynamicNodes();
empty = nodes.isEmpty();
return nodes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.amazonaws.services.ec2.model.Tag;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -87,16 +86,16 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi
null);
}

protected List<DiscoveryNode> buildDynamicNodes(Settings nodeSettings, int nodes) {
return buildDynamicNodes(nodeSettings, nodes, null);
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes) {
return buildDynamicHosts(nodeSettings, nodes, null);
}

protected List<DiscoveryNode> buildDynamicNodes(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY, nodes, tagsList)) {
AwsEc2UnicastHostsProvider provider = new AwsEc2UnicastHostsProvider(nodeSettings, transportService, plugin.ec2Service);
List<DiscoveryNode> discoveryNodes = provider.buildDynamicNodes();
logger.debug("--> nodes found: {}", discoveryNodes);
return discoveryNodes;
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
logger.debug("--> addresses found: {}", dynamicHosts);
return dynamicHosts;
} catch (IOException e) {
fail("Unexpected IOException");
return null;
Expand All @@ -107,7 +106,7 @@ public void testDefaultSettings() throws InterruptedException {
int nodes = randomInt(10);
Settings nodeSettings = Settings.builder()
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
List<TransportAddress> discoveryNodes = buildDynamicHosts(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
}

Expand All @@ -119,12 +118,11 @@ public void testPrivateIp() throws InterruptedException {
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "private_ip")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
List<TransportAddress> transportAddresses = buildDynamicHosts(nodeSettings, nodes);
assertThat(transportAddresses, hasSize(nodes));
// We check that we are using here expected address
int node = 1;
for (DiscoveryNode discoveryNode : discoveryNodes) {
TransportAddress address = discoveryNode.getAddress();
for (TransportAddress address : transportAddresses) {
TransportAddress expected = poorMansDNS.get(AmazonEC2Mock.PREFIX_PRIVATE_IP + node++);
assertEquals(address, expected);
}
Expand All @@ -138,12 +136,11 @@ public void testPublicIp() throws InterruptedException {
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "public_ip")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes);
assertThat(dynamicHosts, hasSize(nodes));
// We check that we are using here expected address
int node = 1;
for (DiscoveryNode discoveryNode : discoveryNodes) {
TransportAddress address = discoveryNode.getAddress();
for (TransportAddress address : dynamicHosts) {
TransportAddress expected = poorMansDNS.get(AmazonEC2Mock.PREFIX_PUBLIC_IP + node++);
assertEquals(address, expected);
}
Expand All @@ -159,13 +156,12 @@ public void testPrivateDns() throws InterruptedException {
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "private_dns")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes);
assertThat(dynamicHosts, hasSize(nodes));
// We check that we are using here expected address
int node = 1;
for (DiscoveryNode discoveryNode : discoveryNodes) {
for (TransportAddress address : dynamicHosts) {
String instanceId = "node" + node++;
TransportAddress address = discoveryNode.getAddress();
TransportAddress expected = poorMansDNS.get(
AmazonEC2Mock.PREFIX_PRIVATE_DNS + instanceId + AmazonEC2Mock.SUFFIX_PRIVATE_DNS);
assertEquals(address, expected);
Expand All @@ -182,13 +178,12 @@ public void testPublicDns() throws InterruptedException {
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "public_dns")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes);
assertThat(dynamicHosts, hasSize(nodes));
// We check that we are using here expected address
int node = 1;
for (DiscoveryNode discoveryNode : discoveryNodes) {
for (TransportAddress address : dynamicHosts) {
String instanceId = "node" + node++;
TransportAddress address = discoveryNode.getAddress();
TransportAddress expected = poorMansDNS.get(
AmazonEC2Mock.PREFIX_PUBLIC_DNS + instanceId + AmazonEC2Mock.SUFFIX_PUBLIC_DNS);
assertEquals(address, expected);
Expand All @@ -201,7 +196,7 @@ public void testInvalidHostType() throws InterruptedException {
.build();

IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
buildDynamicNodes(nodeSettings, 1);
buildDynamicHosts(nodeSettings, 1);
});
assertThat(exception.getMessage(), containsString("does_not_exist is unknown for discovery.ec2.host_type"));
}
Expand All @@ -227,8 +222,8 @@ public void testFilterByTags() throws InterruptedException {
}

logger.info("started [{}] instances with [{}] stage=prod tag", nodes, prodInstances);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
assertThat(discoveryNodes, hasSize(prodInstances));
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList);
assertThat(dynamicHosts, hasSize(prodInstances));
}

public void testFilterByMultipleTags() throws InterruptedException {
Expand Down Expand Up @@ -258,8 +253,8 @@ public void testFilterByMultipleTags() throws InterruptedException {
}

logger.info("started [{}] instances with [{}] stage=prod tag", nodes, prodInstances);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
assertThat(discoveryNodes, hasSize(prodInstances));
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList);
assertThat(dynamicHosts, hasSize(prodInstances));
}

public void testReadHostFromTag() throws InterruptedException, UnknownHostException {
Expand All @@ -285,11 +280,11 @@ public void testReadHostFromTag() throws InterruptedException, UnknownHostExcept
}

logger.info("started [{}] instances", nodes);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
assertThat(discoveryNodes, hasSize(nodes));
for (DiscoveryNode discoveryNode : discoveryNodes) {
TransportAddress address = discoveryNode.getAddress();
TransportAddress expected = poorMansDNS.get(discoveryNode.getName());
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList);
assertThat(dynamicHosts, hasSize(nodes));
int node = 1;
for (TransportAddress address : dynamicHosts) {
TransportAddress expected = poorMansDNS.get("node" + node++);
assertEquals(address, expected);
}
}
Expand All @@ -306,13 +301,13 @@ public void testGetNodeListEmptyCache() throws Exception {
AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null);
DummyEc2HostProvider provider = new DummyEc2HostProvider(Settings.EMPTY, transportService, awsEc2Service) {
@Override
protected List<DiscoveryNode> fetchDynamicNodes() {
protected List<TransportAddress> fetchDynamicNodes() {
fetchCount++;
return new ArrayList<>();
}
};
for (int i=0; i<3; i++) {
provider.buildDynamicNodes();
provider.buildDynamicHosts();
}
assertThat(provider.fetchCount, is(3));
}
Expand All @@ -323,18 +318,18 @@ public void testGetNodeListCached() throws Exception {
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY)) {
DummyEc2HostProvider provider = new DummyEc2HostProvider(builder.build(), transportService, plugin.ec2Service) {
@Override
protected List<DiscoveryNode> fetchDynamicNodes() {
protected List<TransportAddress> fetchDynamicNodes() {
fetchCount++;
return Ec2DiscoveryTests.this.buildDynamicNodes(Settings.EMPTY, 1);
return Ec2DiscoveryTests.this.buildDynamicHosts(Settings.EMPTY, 1);
}
};
for (int i=0; i<3; i++) {
provider.buildDynamicNodes();
provider.buildDynamicHosts();
}
assertThat(provider.fetchCount, is(1));
Thread.sleep(1_000L); // wait for cache to expire
for (int i=0; i<3; i++) {
provider.buildDynamicNodes();
provider.buildDynamicHosts();
}
assertThat(provider.fetchCount, is(2));
}
Expand Down
Loading

0 comments on commit e647fa5

Please sign in to comment.