Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ec2 secure settings #29134

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery.ec2;

import com.amazonaws.services.ec2.AmazonEC2;

import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;

/**
* Handles the shutdown of the wrapped {@link AmazonEC2Client} using reference
* counting.
*/
public class AmazonEc2Reference extends AbstractRefCounted implements Releasable {

private final AmazonEC2 client;

AmazonEc2Reference(AmazonEC2 client) {
super("AWS_EC2_CLIENT");
this.client = client;
}

/**
* Call when the client is not needed anymore.
*/
@Override
public void close() {
decRef();
}

/**
* Returns the underlying `AmazonEC2` client. All method calls are permitted BUT
* NOT shutdown. Shutdown is called when reference count reaches 0.
*/
public AmazonEC2 client() {
return client;
}

@Override
protected void closeInternal() {
client.shutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,12 @@

package org.elasticsearch.discovery.ec2;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.services.ec2.AmazonEC2;
import org.elasticsearch.common.settings.SecureSetting;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.function.Function;

interface AwsEc2Service {
Expand All @@ -46,36 +38,6 @@ class HostType {
public static final String TAG_PREFIX = "tag:";
}

/** The access key (ie login id) for connecting to ec2. */
Setting<SecureString> ACCESS_KEY_SETTING = SecureSetting.secureString("discovery.ec2.access_key", null);

/** The secret key (ie password) for connecting to ec2. */
Setting<SecureString> SECRET_KEY_SETTING = SecureSetting.secureString("discovery.ec2.secret_key", null);

/** An override for the ec2 endpoint to connect to. */
Setting<String> ENDPOINT_SETTING = new Setting<>("discovery.ec2.endpoint", "",
s -> s.toLowerCase(Locale.ROOT), Property.NodeScope);

/** The protocol to use to connect to to ec2. */
Setting<Protocol> PROTOCOL_SETTING = new Setting<>("discovery.ec2.protocol", "https",
s -> Protocol.valueOf(s.toUpperCase(Locale.ROOT)), Property.NodeScope);

/** The host name of a proxy to connect to ec2 through. */
Setting<String> PROXY_HOST_SETTING = Setting.simpleString("discovery.ec2.proxy.host", Property.NodeScope);

/** The port of a proxy to connect to ec2 through. */
Setting<Integer> PROXY_PORT_SETTING = Setting.intSetting("discovery.ec2.proxy.port", 80, 0, 1<<16, Property.NodeScope);

/** The username of a proxy to connect to s3 through. */
Setting<SecureString> PROXY_USERNAME_SETTING = SecureSetting.secureString("discovery.ec2.proxy.username", null);

/** The password of a proxy to connect to s3 through. */
Setting<SecureString> PROXY_PASSWORD_SETTING = SecureSetting.secureString("discovery.ec2.proxy.password", null);

/** The socket timeout for connecting to s3. */
Setting<TimeValue> READ_TIMEOUT_SETTING = Setting.timeSetting("discovery.ec2.read_timeout",
TimeValue.timeValueMillis(ClientConfiguration.DEFAULT_SOCKET_TIMEOUT), Property.NodeScope);

/**
* discovery.ec2.host_type: The type of host type to use to communicate with other instances.
* Can be one of private_ip, public_ip, private_dns, public_dns or tag:XXXX where
Expand All @@ -88,26 +50,24 @@ class HostType {
* discovery.ec2.any_group: If set to false, will require all security groups to be present for the instance to be used for the
* discovery. Defaults to true.
*/
Setting<Boolean> ANY_GROUP_SETTING =
Setting.boolSetting("discovery.ec2.any_group", true, Property.NodeScope);
Setting<Boolean> ANY_GROUP_SETTING = Setting.boolSetting("discovery.ec2.any_group", true, Property.NodeScope);
/**
* discovery.ec2.groups: Either a comma separated list or array based list of (security) groups. Only instances with the provided
* security groups will be used in the cluster discovery. (NOTE: You could provide either group NAME or group ID.)
*/
Setting<List<String>> GROUPS_SETTING =
Setting.listSetting("discovery.ec2.groups", new ArrayList<>(), s -> s.toString(), Property.NodeScope);
Setting<List<String>> GROUPS_SETTING = Setting.listSetting("discovery.ec2.groups", new ArrayList<>(), s -> s.toString(),
Property.NodeScope);
/**
* discovery.ec2.availability_zones: Either a comma separated list or array based list of availability zones. Only instances within
* the provided availability zones will be used in the cluster discovery.
*/
Setting<List<String>> AVAILABILITY_ZONES_SETTING =
Setting.listSetting("discovery.ec2.availability_zones", Collections.emptyList(), s -> s.toString(),
Property.NodeScope);
Setting<List<String>> AVAILABILITY_ZONES_SETTING = Setting.listSetting("discovery.ec2.availability_zones", Collections.emptyList(),
s -> s.toString(), Property.NodeScope);
/**
* discovery.ec2.node_cache_time: How long the list of hosts is cached to prevent further requests to the AWS API. Defaults to 10s.
*/
Setting<TimeValue> NODE_CACHE_TIME_SETTING =
Setting.timeSetting("discovery.ec2.node_cache_time", TimeValue.timeValueSeconds(10), Property.NodeScope);
Setting<TimeValue> NODE_CACHE_TIME_SETTING = Setting.timeSetting("discovery.ec2.node_cache_time", TimeValue.timeValueSeconds(10),
Property.NodeScope);

/**
* discovery.ec2.tag.*: The ec2 discovery can filter machines to include in the cluster based on tags (and not just groups).
Expand All @@ -116,7 +76,28 @@ class HostType {
* instance to be included.
*/
Setting.AffixSetting<List<String>> TAG_SETTING = Setting.prefixKeySetting("discovery.ec2.tag.",
key -> Setting.listSetting(key, Collections.emptyList(), Function.identity(), Property.NodeScope));
key -> Setting.listSetting(key, Collections.emptyList(), Function.identity(), Property.NodeScope));

/**
* Creates then caches an {@code AmazonEC2} client using the current client
* settings.
*/
AmazonEc2Reference client();

AmazonEC2 client();
/**
* Updates settings for building the client. Future client requests will use the
* new settings. Implementations SHOULD drop the client cache to prevent reusing
* the client with old settings from cache.
*
* @param clientSettings
* the new settings
* @return the old settings
*/
Ec2ClientSettings updateClientSettings(Ec2ClientSettings clientSettings);

/**
* Releases the cached client. Subsequent client requests will recreate the
* client instance. Does not touch the client settings.
*/
void releaseCachedClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@

package org.elasticsearch.discovery.ec2;

import java.io.Closeable;
import java.io.IOException;
import java.util.Random;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
Expand All @@ -38,110 +34,124 @@
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;

class AwsEc2ServiceImpl extends AbstractComponent implements AwsEc2Service, Closeable {
class AwsEc2ServiceImpl extends AbstractComponent implements AwsEc2Service {

public static final String EC2_METADATA_URL = "http://169.254.169.254/latest/meta-data/";

private AmazonEC2Client client;
private volatile AmazonEc2Reference clientReference;
private volatile Ec2ClientSettings clientSettings;

AwsEc2ServiceImpl(Settings settings) {
super(settings);
}

@Override
public synchronized AmazonEC2 client() {
if (client != null) {
return client;
}

this.client = new AmazonEC2Client(buildCredentials(logger, settings), buildConfiguration(logger, settings));
String endpoint = findEndpoint(logger, settings);
if (endpoint != null) {
client.setEndpoint(endpoint);
private AmazonEC2 buildClient(Ec2ClientSettings clientSettings) {
final AWSCredentialsProvider credentials = buildCredentials(logger, clientSettings);
final ClientConfiguration configuration = buildConfiguration(logger, clientSettings);
final AmazonEC2 client = buildClient(credentials, configuration);
if (Strings.hasText(clientSettings.endpoint)) {
logger.debug("using explicit ec2 endpoint [{}]", clientSettings.endpoint);
client.setEndpoint(clientSettings.endpoint);
}

return this.client;
return client;
}

protected static AWSCredentialsProvider buildCredentials(Logger logger, Settings settings) {
AWSCredentialsProvider credentials;

try (SecureString key = ACCESS_KEY_SETTING.get(settings);
SecureString secret = SECRET_KEY_SETTING.get(settings)) {
if (key.length() == 0 && secret.length() == 0) {
logger.debug("Using either environment variables, system properties or instance profile credentials");
credentials = new DefaultAWSCredentialsProviderChain();
} else {
logger.debug("Using basic key/secret credentials");
credentials = new StaticCredentialsProvider(new BasicAWSCredentials(key.toString(), secret.toString()));
}
}

return credentials;
// proxy for testing
AmazonEC2 buildClient(AWSCredentialsProvider credentials, ClientConfiguration configuration) {
final AmazonEC2 client = new AmazonEC2Client(credentials, configuration);
return client;
}

protected static ClientConfiguration buildConfiguration(Logger logger, Settings settings) {
ClientConfiguration clientConfiguration = new ClientConfiguration();
// pkg private for tests
static ClientConfiguration buildConfiguration(Logger logger, Ec2ClientSettings clientSettings) {
final ClientConfiguration clientConfiguration = new ClientConfiguration();
// the response metadata cache is only there for diagnostics purposes,
// but can force objects from every response to the old generation.
clientConfiguration.setResponseMetadataCacheSize(0);
clientConfiguration.setProtocol(PROTOCOL_SETTING.get(settings));

if (PROXY_HOST_SETTING.exists(settings)) {
String proxyHost = PROXY_HOST_SETTING.get(settings);
Integer proxyPort = PROXY_PORT_SETTING.get(settings);
try (SecureString proxyUsername = PROXY_USERNAME_SETTING.get(settings);
SecureString proxyPassword = PROXY_PASSWORD_SETTING.get(settings)) {

clientConfiguration
.withProxyHost(proxyHost)
.withProxyPort(proxyPort)
.withProxyUsername(proxyUsername.toString())
.withProxyPassword(proxyPassword.toString());
}
clientConfiguration.setProtocol(clientSettings.protocol);
if (Strings.hasText(clientSettings.proxyHost)) {
// TODO: remove this leniency, these settings should exist together and be validated
clientConfiguration.setProxyHost(clientSettings.proxyHost);
clientConfiguration.setProxyPort(clientSettings.proxyPort);
clientConfiguration.setProxyUsername(clientSettings.proxyUsername);
clientConfiguration.setProxyPassword(clientSettings.proxyPassword);
}

// Increase the number of retries in case of 5xx API responses
final Random rand = Randomness.get();
RetryPolicy retryPolicy = new RetryPolicy(
final RetryPolicy retryPolicy = new RetryPolicy(
RetryPolicy.RetryCondition.NO_RETRY_CONDITION,
new RetryPolicy.BackoffStrategy() {
@Override
public long delayBeforeNextRetry(AmazonWebServiceRequest originalRequest,
AmazonClientException exception,
int retriesAttempted) {
// with 10 retries the max delay time is 320s/320000ms (10 * 2^5 * 1 * 1000)
logger.warn("EC2 API request failed, retry again. Reason was:", exception);
return 1000L * (long) (10d * Math.pow(2, retriesAttempted / 2.0d) * (1.0d + rand.nextDouble()));
}
(originalRequest, exception, retriesAttempted) -> {
// with 10 retries the max delay time is 320s/320000ms (10 * 2^5 * 1 * 1000)
logger.warn("EC2 API request failed, retry again. Reason was:", exception);
return 1000L * (long) (10d * Math.pow(2, retriesAttempted / 2.0d) * (1.0d + rand.nextDouble()));
},
10,
false);
clientConfiguration.setRetryPolicy(retryPolicy);
clientConfiguration.setSocketTimeout((int) READ_TIMEOUT_SETTING.get(settings).millis());

clientConfiguration.setSocketTimeout(clientSettings.readTimeoutMillis);
return clientConfiguration;
}

protected static String findEndpoint(Logger logger, Settings settings) {
String endpoint = null;
if (ENDPOINT_SETTING.exists(settings)) {
endpoint = ENDPOINT_SETTING.get(settings);
logger.debug("using explicit ec2 endpoint [{}]", endpoint);
// pkg private for tests
static AWSCredentialsProvider buildCredentials(Logger logger, Ec2ClientSettings clientSettings) {
final BasicAWSCredentials credentials = clientSettings.credentials;
if (credentials == null) {
logger.debug("Using either environment variables, system properties or instance profile credentials");
return new DefaultAWSCredentialsProviderChain();
} else {
logger.debug("Using basic key/secret credentials");
return new StaticCredentialsProvider(credentials);
}
return endpoint;
}

@Override
public void close() throws IOException {
if (client != null) {
client.shutdown();
public AmazonEc2Reference client() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can generalizer this and maybe have a utils class in core that does what we do here. Operate on refcounted and maintain a map. Then we can also have a factory method and that way we share most of the logic. Maybe a followup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ This is becoming a pattern and the code is duplicated. I will do the refactoring as the last PR to this branch, to make sure I account for all distinctions of all use cases.

if ((clientReference != null) && clientReference.tryIncRef()) {
return clientReference;
}
synchronized (this) {
if ((clientReference != null) && clientReference.tryIncRef()) {
return clientReference;
}
if (clientSettings == null) {
throw new IllegalArgumentException("Missing ec2 client configs.");
}
final AmazonEc2Reference clientReference = new AmazonEc2Reference(buildClient(clientSettings));
clientReference.incRef();
this.clientReference = clientReference;
return clientReference;
}
}


// Ensure that IdleConnectionReaper is shutdown
/**
* Reloads the settings for the AmazonEC2 client. New clients will be build
* using these. Old client is usable until released. On release it will be
* destroyed instead of being returned to the cache.
*/
@Override
public synchronized Ec2ClientSettings updateClientSettings(Ec2ClientSettings clientSettings) {
// shutdown all unused clients
// others will shutdown on their respective release
releaseCachedClient();
final Ec2ClientSettings prevSettings = this.clientSettings;
this.clientSettings = clientSettings;
return prevSettings;
}

@Override
public synchronized void releaseCachedClient() {
if (this.clientReference == null) {
return;
}
// the client will shutdown when it will not be used anymore
this.clientReference.decRef();
// clear the cached client, it will be build lazily
this.clientReference = null;
// shutdown IdleConnectionReaper background thread
// it will be restarted on new client usage
IdleConnectionReaper.shutdown();
}
}
Loading