Skip to content

Commit

Permalink
Update s3 secure settings (#28517)
Browse files Browse the repository at this point in the history
Cache of clients by name which can be cleared when
secure settings get updated.
  • Loading branch information
albertzaharovits authored Mar 13, 2018
1 parent 54eb461 commit 3db5ae0
Show file tree
Hide file tree
Showing 20 changed files with 821 additions and 439 deletions.
2 changes: 1 addition & 1 deletion plugins/repository-s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ bundlePlugin {
}

additionalTest('testRepositoryCreds'){
include '**/RepositorySettingsCredentialsTests.class'
include '**/RepositoryCredentialsTests.class'
systemProperty 'es.allow_insecure_settings', 'true'
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.repositories.s3;

import org.elasticsearch.common.util.concurrent.AbstractRefCounted;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;

import org.elasticsearch.common.lease.Releasable;

/**
* Handles the shutdown of the wrapped {@link AmazonS3Client} using reference
* counting.
*/
public class AmazonS3Reference extends AbstractRefCounted implements Releasable {

private final AmazonS3 client;

AmazonS3Reference(AmazonS3 client) {
super("AWS_S3_CLIENT");
this.client = client;
}

/**
* Call when the client is not needed anymore.
*/
@Override
public void close() {
decRef();
}

/**
* Returns the underlying `AmazonS3` client. All method calls are permitted BUT
* NOT shutdown. Shutdown is called when reference count reaches 0.
*/
public AmazonS3 client() {
return client;
}

@Override
protected void closeInternal() {
client.shutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,30 @@

package org.elasticsearch.repositories.s3;

import com.amazonaws.services.s3.AmazonS3;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import java.util.Map;

interface AwsS3Service extends LifecycleComponent {
interface AwsS3Service {

/**
* Creates an {@code AmazonS3} client from the given repository metadata and node settings.
* Creates then caches an {@code AmazonS3} client using the current client
* settings.
*/
AmazonS3 client(Settings repositorySettings);
AmazonS3Reference client(String clientName);

/**
* Updates settings for building clients. Future client requests will use the
* new settings. Implementations SHOULD drop the client cache to prevent reusing
* clients with old settings from cache.
*
* @param clientsSettings
* the new settings
* @return the old settings
*/
Map<String, S3ClientSettings> updateClientsSettings(Map<String, S3ClientSettings> clientsSettings);

/**
* Releases cached clients. Subsequent client requests will recreate client
* instances. Does not touch the client settings.
*/
void releaseCachedClients();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,66 +28,88 @@
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;


class InternalAwsS3Service extends AbstractLifecycleComponent implements AwsS3Service {
import static java.util.Collections.emptyMap;

// pkg private for tests
static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity());

private final Map<String, S3ClientSettings> clientsSettings;
class InternalAwsS3Service extends AbstractComponent implements AwsS3Service {

private final Map<String, AmazonS3Client> clientsCache = new HashMap<>();
private volatile Map<String, AmazonS3Reference> clientsCache = emptyMap();
private volatile Map<String, S3ClientSettings> clientsSettings = emptyMap();

InternalAwsS3Service(Settings settings, Map<String, S3ClientSettings> clientsSettings) {
InternalAwsS3Service(Settings settings) {
super(settings);
this.clientsSettings = clientsSettings;
}

/**
* Reloads the settings for the AmazonS3 client. New clients will be build using
* these. Old clients are usable until released. On release they will be
* destroyed contrary to being returned to the cache.
*/
@Override
public synchronized AmazonS3 client(Settings repositorySettings) {
String clientName = CLIENT_NAME.get(repositorySettings);
AmazonS3Client client = clientsCache.get(clientName);
if (client != null) {
return client;
}
public synchronized Map<String, S3ClientSettings> updateClientsSettings(Map<String, S3ClientSettings> clientsSettings) {
// shutdown all unused clients
// others will shutdown on their respective release
releaseCachedClients();
final Map<String, S3ClientSettings> prevSettings = this.clientsSettings;
this.clientsSettings = MapBuilder.newMapBuilder(clientsSettings).immutableMap();
assert this.clientsSettings.containsKey("default") : "always at least have 'default'";
// clients are built lazily by {@link client(String)}
return prevSettings;
}

S3ClientSettings clientSettings = clientsSettings.get(clientName);
if (clientSettings == null) {
throw new IllegalArgumentException("Unknown s3 client name [" + clientName + "]. Existing client configs: " +
Strings.collectionToDelimitedString(clientsSettings.keySet(), ","));
/**
* Attempts to retrieve a client by name from the cache. If the client does not
* exist it will be created.
*/
@Override
public AmazonS3Reference client(String clientName) {
AmazonS3Reference clientReference = clientsCache.get(clientName);
if ((clientReference != null) && clientReference.tryIncRef()) {
return clientReference;
}
synchronized (this) {
clientReference = clientsCache.get(clientName);
if ((clientReference != null) && clientReference.tryIncRef()) {
return clientReference;
}
final S3ClientSettings clientSettings = clientsSettings.get(clientName);
if (clientSettings == null) {
throw new IllegalArgumentException("Unknown s3 client name [" + clientName + "]. Existing client configs: "
+ Strings.collectionToDelimitedString(clientsSettings.keySet(), ","));
}
logger.debug("creating S3 client with client_name [{}], endpoint [{}]", clientName, clientSettings.endpoint);
clientReference = new AmazonS3Reference(buildClient(clientSettings));
clientReference.incRef();
clientsCache = MapBuilder.newMapBuilder(clientsCache).put(clientName, clientReference).immutableMap();
return clientReference;
}
}

logger.debug("creating S3 client with client_name [{}], endpoint [{}]", clientName, clientSettings.endpoint);

AWSCredentialsProvider credentials = buildCredentials(logger, deprecationLogger, clientSettings, repositorySettings);
ClientConfiguration configuration = buildConfiguration(clientSettings);

client = new AmazonS3Client(credentials, configuration);

private AmazonS3 buildClient(S3ClientSettings clientSettings) {
final AWSCredentialsProvider credentials = buildCredentials(logger, clientSettings);
final ClientConfiguration configuration = buildConfiguration(clientSettings);
final AmazonS3 client = buildClient(credentials, configuration);
if (Strings.hasText(clientSettings.endpoint)) {
client.setEndpoint(clientSettings.endpoint);
}

clientsCache.put(clientName, client);
return client;
}

// proxy for testing
AmazonS3 buildClient(AWSCredentialsProvider credentials, ClientConfiguration configuration) {
return new AmazonS3Client(credentials, configuration);
}

// pkg private for tests
static ClientConfiguration buildConfiguration(S3ClientSettings clientSettings) {
ClientConfiguration clientConfiguration = new ClientConfiguration();
final ClientConfiguration clientConfiguration = new ClientConfiguration();
// the response metadata cache is only there for diagnostics purposes,
// but can force objects from every response to the old generation.
clientConfiguration.setResponseMetadataCacheSize(0);
Expand All @@ -109,27 +131,8 @@ static ClientConfiguration buildConfiguration(S3ClientSettings clientSettings) {
}

// pkg private for tests
static AWSCredentialsProvider buildCredentials(Logger logger, DeprecationLogger deprecationLogger,
S3ClientSettings clientSettings, Settings repositorySettings) {


BasicAWSCredentials credentials = clientSettings.credentials;
if (S3Repository.ACCESS_KEY_SETTING.exists(repositorySettings)) {
if (S3Repository.SECRET_KEY_SETTING.exists(repositorySettings) == false) {
throw new IllegalArgumentException("Repository setting [" + S3Repository.ACCESS_KEY_SETTING.getKey() +
" must be accompanied by setting [" + S3Repository.SECRET_KEY_SETTING.getKey() + "]");
}
try (SecureString key = S3Repository.ACCESS_KEY_SETTING.get(repositorySettings);
SecureString secret = S3Repository.SECRET_KEY_SETTING.get(repositorySettings)) {
credentials = new BasicAWSCredentials(key.toString(), secret.toString());
}
// backcompat for reading keys out of repository settings
deprecationLogger.deprecated("Using s3 access/secret key from repository settings. Instead " +
"store these in named clients and the elasticsearch keystore for secure settings.");
} else if (S3Repository.SECRET_KEY_SETTING.exists(repositorySettings)) {
throw new IllegalArgumentException("Repository setting [" + S3Repository.SECRET_KEY_SETTING.getKey() +
" must be accompanied by setting [" + S3Repository.ACCESS_KEY_SETTING.getKey() + "]");
}
static AWSCredentialsProvider buildCredentials(Logger logger, S3ClientSettings clientSettings) {
final BasicAWSCredentials credentials = clientSettings.credentials;
if (credentials == null) {
logger.debug("Using instance profile credentials");
return new PrivilegedInstanceProfileCredentialsProvider();
Expand All @@ -140,20 +143,15 @@ static AWSCredentialsProvider buildCredentials(Logger logger, DeprecationLogger
}

@Override
protected void doStart() throws ElasticsearchException {
}

@Override
protected void doStop() throws ElasticsearchException {
}

@Override
protected void doClose() throws ElasticsearchException {
for (AmazonS3Client client : clientsCache.values()) {
client.shutdown();
public synchronized void releaseCachedClients() {
// the clients will shutdown when they will not be used anymore
for (final AmazonS3Reference clientReference : clientsCache.values()) {
clientReference.decRef();
}

// Ensure that IdleConnectionReaper is shutdown
// clear previously cached clients, they will be build lazily
clientsCache = emptyMap();
// shutdown IdleConnectionReaper background thread
// it will be restarted on new client usage
IdleConnectionReaper.shutdown();
}

Expand All @@ -174,4 +172,5 @@ public void refresh() {
SocketAccess.doPrivilegedVoid(credentials::refresh);
}
}

}
Loading

0 comments on commit 3db5ae0

Please sign in to comment.