Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Snapshot Repository Deletes #40144

Merged
Merged
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
878d731
add threadpool to blobstore repository
original-brownbear Mar 17, 2019
18cd785
async delete api
original-brownbear Mar 17, 2019
05ffbad
still fails
original-brownbear Mar 17, 2019
8f5081e
tests pass
original-brownbear Mar 18, 2019
a1b3a8a
tests pass
original-brownbear Mar 18, 2019
cdcb7e8
Merge remote-tracking branch 'elastic/master' into async-shard-writes
original-brownbear Mar 18, 2019
305b500
shorter
original-brownbear Mar 18, 2019
d27e6c6
add javadoc
original-brownbear Mar 18, 2019
dced6df
CR: Comments
original-brownbear Mar 22, 2019
4f8d93d
CR comments
original-brownbear Mar 22, 2019
e55226d
Merge remote-tracking branch 'elastic/master' into async-repository-d…
original-brownbear Mar 22, 2019
ae6fb85
add threadpool to internal repo call
original-brownbear Mar 22, 2019
26635c5
Merge remote-tracking branch 'elastic/master' into async-repository-d…
original-brownbear Mar 22, 2019
c24bb71
use generic pool for snapshot restore
original-brownbear Apr 1, 2019
b4c21c9
Merge remote-tracking branch 'elastic/master' into async-repository-d…
original-brownbear Apr 1, 2019
585bcbb
CR: add a little doc
original-brownbear Apr 1, 2019
1fd0f83
Merge remote-tracking branch 'elastic/master' into async-repository-d…
original-brownbear Apr 1, 2019
2895f19
CR: chain actions to dry things up a little
original-brownbear Apr 1, 2019
486a31c
Merge remote-tracking branch 'elastic/master' into async-repository-d…
original-brownbear Apr 1, 2019
d045920
dry up logic using callback chain
original-brownbear Apr 1, 2019
6bac7c0
add comment
original-brownbear Apr 1, 2019
e1c8162
dry logic up some more
original-brownbear Apr 1, 2019
def77dc
dry logic up some more
original-brownbear Apr 1, 2019
b0dd8c9
Merge remote-tracking branch 'elastic/master' into async-repository-d…
original-brownbear Apr 2, 2019
4d7c00d
Merge remote-tracking branch 'elastic/master' into async-repository-d…
original-brownbear Apr 3, 2019
0c06525
much simpler
original-brownbear Apr 3, 2019
1f921ee
much simpler
original-brownbear Apr 3, 2019
d78b7f0
Merge remote-tracking branch 'elastic/master' into async-repository-d…
original-brownbear Apr 3, 2019
36b2e38
remove comment
original-brownbear Apr 3, 2019
a156b70
Merge remote-tracking branch 'elastic/master' into async-repository-d…
original-brownbear Apr 4, 2019
59bdc97
CR comments
original-brownbear Apr 4, 2019
c311311
CR comments
original-brownbear Apr 4, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add threadpool to blobstore repository
  • Loading branch information
original-brownbear committed Mar 17, 2019
commit 878d7317205a073deac56455adfb5d349b23cab6
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.url.URLRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collections;
@@ -44,7 +45,9 @@ public List<Setting<?>> getSettings() {
}

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.singletonMap(URLRepository.TYPE, metadata -> new URLRepository(metadata, env, namedXContentRegistry));
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
return Collections.singletonMap(URLRepository.TYPE,
metadata -> new URLRepository(metadata, env, namedXContentRegistry, threadPool));
}
}
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.net.MalformedURLException;
import java.net.URISyntaxException;
@@ -82,8 +83,8 @@ public class URLRepository extends BlobStoreRepository {
* Constructs a read-only URL-based repository
*/
public URLRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry) {
super(metadata, environment.settings(), namedXContentRegistry);
NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) {
super(metadata, environment.settings(), namedXContentRegistry, threadPool);

if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) {
throw new RepositoryException(metadata.name(), "missing url");
Original file line number Diff line number Diff line change
@@ -26,20 +26,37 @@
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;

public class URLRepositoryTests extends ESTestCase {

private ThreadPool threadPool;

@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("URLRepositoryTests");
}

@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 1L, TimeUnit.MINUTES);
}

private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) {
return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings),
new NamedXContentRegistry(Collections.emptyList())) {
new NamedXContentRegistry(Collections.emptyList()), threadPool) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.threadpool.ThreadPool;

import java.net.URISyntaxException;
import java.util.List;
@@ -84,8 +85,8 @@ public static final class Repository {
private final boolean readonly;

public AzureRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry,
AzureStorageService storageService) {
super(metadata, environment.settings(), namedXContentRegistry);
AzureStorageService storageService, ThreadPool threadPool) {
super(metadata, environment.settings(), namedXContentRegistry, threadPool);
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.storageService = storageService;

Original file line number Diff line number Diff line change
@@ -28,6 +28,8 @@
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -47,9 +49,10 @@ public AzureRepositoryPlugin(Settings settings) {
}

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
return Collections.singletonMap(AzureRepository.TYPE,
(metadata) -> new AzureRepository(metadata, env, namedXContentRegistry, azureStoreService));
(metadata) -> new AzureRepository(metadata, env, namedXContentRegistry, azureStoreService, threadPool));
}

@Override
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
@@ -42,7 +43,8 @@ private AzureRepository azureRepository(Settings settings) {
.put(settings)
.build();
final AzureRepository azureRepository = new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings),
TestEnvironment.newEnvironment(internalSettings), NamedXContentRegistry.EMPTY, mock(AzureStorageService.class));
TestEnvironment.newEnvironment(internalSettings), NamedXContentRegistry.EMPTY, mock(AzureStorageService.class),
mock(ThreadPool.class));
assertThat(azureRepository.getBlobStore(), is(nullValue()));
return azureRepository;
}
Original file line number Diff line number Diff line change
@@ -27,6 +27,8 @@
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -49,9 +51,10 @@ protected GoogleCloudStorageService createStorageService() {
}

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
(metadata) -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, this.storageService));
metadata -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, this.storageService, threadPool));
}

@Override
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.function.Function;

@@ -63,8 +64,8 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {

GoogleCloudStorageRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry,
GoogleCloudStorageService storageService) {
super(metadata, environment.settings(), namedXContentRegistry);
GoogleCloudStorageService storageService, ThreadPool threadPool) {
super(metadata, environment.settings(), namedXContentRegistry, threadPool);
this.storageService = storageService;

String basePath = BASE_PATH.get(metadata.settings());
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;

public final class HdfsPlugin extends Plugin implements RepositoryPlugin {

@@ -110,7 +111,8 @@ private static Void eagerInit() {
}

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry));
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, threadPool));
}
}
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.UncheckedIOException;
@@ -67,8 +68,8 @@ public final class HdfsRepository extends BlobStoreRepository {
private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB);

public HdfsRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry) {
super(metadata, environment.settings(), namedXContentRegistry);
NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) {
super(metadata, environment.settings(), namedXContentRegistry, threadPool);

this.environment = environment;
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.function.Function;

@@ -165,8 +166,8 @@ class S3Repository extends BlobStoreRepository {
S3Repository(final RepositoryMetaData metadata,
final Settings settings,
final NamedXContentRegistry namedXContentRegistry,
final S3Service service) {
super(metadata, settings, namedXContentRegistry);
final S3Service service, final ThreadPool threadPool) {
super(metadata, settings, namedXContentRegistry, threadPool);
this.service = service;

this.repositoryMetaData = metadata;
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.security.AccessController;
@@ -77,13 +78,15 @@ public S3RepositoryPlugin(final Settings settings) {
// proxy method for testing
protected S3Repository createRepository(final RepositoryMetaData metadata,
final Settings settings,
final NamedXContentRegistry registry) {
return new S3Repository(metadata, settings, registry, service);
final NamedXContentRegistry registry, final ThreadPool threadPool) {
return new S3Repository(metadata, settings, registry, service, threadPool);
}

@Override
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry) {
return Collections.singletonMap(S3Repository.TYPE, (metadata) -> createRepository(metadata, env.settings(), registry));
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry,
final ThreadPool threadPool) {
return Collections.singletonMap(S3Repository.TYPE,
metadata -> createRepository(metadata, env.settings(), registry, threadPool));
}

@Override
Original file line number Diff line number Diff line change
@@ -30,12 +30,14 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;

import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;

@SuppressForbidden(reason = "test fixture requires System.setProperty")
public class RepositoryCredentialsTests extends ESTestCase {
@@ -61,9 +63,9 @@ static final class ClientAndCredentials extends AmazonS3Wrapper {
}

static final class ProxyS3Service extends S3Service {

private static final Logger logger = LogManager.getLogger(ProxyS3Service.class);

@Override
AmazonS3 buildClient(final S3ClientSettings clientSettings) {
final AmazonS3 client = super.buildClient(clientSettings);
@@ -77,8 +79,9 @@ AmazonS3 buildClient(final S3ClientSettings clientSettings) {
}

@Override
protected S3Repository createRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry registry) {
return new S3Repository(metadata, settings, registry, service){
protected S3Repository createRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry registry,
ThreadPool threadPool) {
return new S3Repository(metadata, settings, registry, service, threadPool){
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads
@@ -106,7 +109,7 @@ public void testRepositoryCredentialsOverrideSecureCredentials() throws IOExcept
.put(S3Repository.ACCESS_KEY_SETTING.getKey(), "insecure_aws_key")
.put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret").build());
try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings);
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin);
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class));
AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials();
assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key"));
@@ -129,7 +132,7 @@ public void testRepositoryCredentialsOnly() throws IOException {
.put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret")
.build());
try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(Settings.EMPTY);
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin);
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class));
AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials();
assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key"));
@@ -144,8 +147,8 @@ public void testRepositoryCredentialsOnly() throws IOException {
+ " See the breaking changes documentation for the next major version.");
}

private S3Repository createAndStartRepository(RepositoryMetaData metadata, S3RepositoryPlugin s3Plugin) {
final S3Repository repository = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY);
private S3Repository createAndStartRepository(RepositoryMetaData metadata, S3RepositoryPlugin s3Plugin, ThreadPool threadPool) {
final S3Repository repository = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, threadPool);
repository.start();
return repository;
}
@@ -168,7 +171,7 @@ public void testReinitSecureCredentials() throws IOException {
}
final RepositoryMetaData metadata = new RepositoryMetaData("dummy-repo", "mock", builder.build());
try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings);
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin)) {
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class))) {
try (AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials
.getCredentials();
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.AfterClass;
import org.junit.BeforeClass;

@@ -114,14 +115,15 @@ public TestS3RepositoryPlugin(final Settings settings) {
}

@Override
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry) {
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry,
final ThreadPool threadPool) {
return Collections.singletonMap(S3Repository.TYPE,
(metadata) -> new S3Repository(metadata, env.settings(), registry, new S3Service() {
metadata -> new S3Repository(metadata, env.settings(), registry, new S3Service() {
@Override
AmazonS3 buildClient(S3ClientSettings clientSettings) {
return new MockAmazonS3(blobs, bucket, serverSideEncryption, cannedACL, storageClass);
}
}));
}, threadPool));
}
}

Loading