Skip to content

Commit

Permalink
Make indices stats requests cancellable
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Feb 18, 2021
1 parent 146f7be commit bbfa047
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 168 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http;

import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ReadOnlyEngine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.function.Function;

import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;

/**
* Base class for testing that cancellation works at the REST layer for requests that need to acquire a searcher on one or more shards.
*
* It works by blocking searcher acquisition in order to catch the request mid-execution, and then to check that all the tasks are cancelled
* before they complete normally.
*/
public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeTestCase {

private static final Setting<Boolean> BLOCK_SEARCHER_SETTING
= Setting.boolSetting("index.block_searcher", false, Setting.Property.IndexScope);

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), SearcherBlockingPlugin.class);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

void runTest(Request request, String actionPrefix) throws Exception {

createIndex("test", Settings.builder().put(BLOCK_SEARCHER_SETTING.getKey(), true).build());
ensureGreen("test");

final List<Semaphore> searcherBlocks = new ArrayList<>();
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
for (final IndexService indexService : indicesService) {
for (final IndexShard indexShard : indexService) {
final Engine engine = IndexShardTestCase.getEngine(indexShard);
if (engine instanceof SearcherBlockingEngine) {
searcherBlocks.add(((SearcherBlockingEngine) engine).searcherBlock);
}
}
}
}
assertThat(searcherBlocks, not(empty()));

final List<Releasable> releasables = new ArrayList<>();
try {
for (final Semaphore searcherBlock : searcherBlocks) {
searcherBlock.acquire();
releasables.add(searcherBlock::release);
}

final PlainActionFuture<Void> future = new PlainActionFuture<>();
logger.info("--> sending request");
final Cancellable cancellable = getRestClient().performRequestAsync(request, new ResponseListener() {
@Override
public void onSuccess(Response response) {
future.onResponse(null);
}

@Override
public void onFailure(Exception exception) {
future.onFailure(exception);
}
});

logger.info("--> waiting for task to start");
assertBusy(() -> {
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(actionPrefix)));
});

logger.info("--> waiting for at least one task to hit a block");
assertBusy(() -> assertTrue(searcherBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));

logger.info("--> cancelling request");
cancellable.cancel();
expectThrows(CancellationException.class, future::actionGet);

logger.info("--> checking that all tasks are marked as cancelled");
assertBusy(() -> {
boolean foundTask = false;
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
if (cancellableTask.getAction().startsWith(actionPrefix)) {
foundTask = true;
assertTrue(
"task " + cancellableTask.getId() + "/" + cancellableTask.getAction() + " not cancelled",
cancellableTask.isCancelled());
}
}
}
assertTrue("found no cancellable tasks", foundTask);
});
} finally {
Releasables.close(releasables);
}

logger.info("--> checking that all tasks have finished");
assertBusy(() -> {
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(actionPrefix)));
});
}

public static class SearcherBlockingPlugin extends Plugin implements EnginePlugin {

@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
if (BLOCK_SEARCHER_SETTING.get(indexSettings.getSettings())) {
return Optional.of(SearcherBlockingEngine::new);
}
return Optional.empty();
}

@Override
public List<Setting<?>> getSettings() {
return singletonList(BLOCK_SEARCHER_SETTING);
}
}

private static class SearcherBlockingEngine extends ReadOnlyEngine {

final Semaphore searcherBlock = new Semaphore(1);

SearcherBlockingEngine(EngineConfig config) {
super(config, null, new TranslogStats(), true, Function.identity(), true);
}

@Override
public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper) throws EngineException {
try {
searcherBlock.acquire();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
searcherBlock.release();
return super.acquireSearcher(source, scope, wrapper);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,178 +10,14 @@

import org.apache.http.client.methods.HttpGet;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ReadOnlyEngine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.function.Function;

import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;

public class IndicesSegmentsRestCancellationIT extends HttpSmokeTestCase {

public static final Setting<Boolean> BLOCK_SEARCHER_SETTING
= Setting.boolSetting("index.block_searcher", false, Setting.Property.IndexScope);

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), SearcherBlockingPlugin.class);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public class IndicesSegmentsRestCancellationIT extends BlockedSearcherRestCancellationTestCase {
public void testIndicesSegmentsRestCancellation() throws Exception {
runTest(new Request(HttpGet.METHOD_NAME, "/_segments"));
runTest(new Request(HttpGet.METHOD_NAME, "/_segments"), IndicesSegmentsAction.NAME);
}

public void testCatSegmentsRestCancellation() throws Exception {
runTest(new Request(HttpGet.METHOD_NAME, "/_cat/segments"));
}

private void runTest(Request request) throws Exception {

createIndex("test", Settings.builder().put(BLOCK_SEARCHER_SETTING.getKey(), true).build());
ensureGreen("test");

final List<Semaphore> searcherBlocks = new ArrayList<>();
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
for (final IndexService indexService : indicesService) {
for (final IndexShard indexShard : indexService) {
final Engine engine = IndexShardTestCase.getEngine(indexShard);
if (engine instanceof SearcherBlockingEngine) {
searcherBlocks.add(((SearcherBlockingEngine) engine).searcherBlock);
}
}
}
}
assertThat(searcherBlocks, not(empty()));

final List<Releasable> releasables = new ArrayList<>();
try {
for (final Semaphore searcherBlock : searcherBlocks) {
searcherBlock.acquire();
releasables.add(searcherBlock::release);
}

final PlainActionFuture<Void> future = new PlainActionFuture<>();
logger.info("--> sending indices segments request");
final Cancellable cancellable = getRestClient().performRequestAsync(request, new ResponseListener() {
@Override
public void onSuccess(Response response) {
future.onResponse(null);
}

@Override
public void onFailure(Exception exception) {
future.onFailure(exception);
}
});

logger.info("--> waiting for task to start");
assertBusy(() -> {
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(IndicesSegmentsAction.NAME)));
});

logger.info("--> waiting for at least one task to hit a block");
assertBusy(() -> assertTrue(searcherBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));

logger.info("--> cancelling request");
cancellable.cancel();
expectThrows(CancellationException.class, future::actionGet);

logger.info("--> checking that all indices segments tasks are marked as cancelled");
assertBusy(() -> {
boolean foundTask = false;
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
if (cancellableTask.getAction().startsWith(IndicesSegmentsAction.NAME)) {
foundTask = true;
assertTrue("task " + cancellableTask.getId() + " not cancelled", cancellableTask.isCancelled());
}
}
}
assertTrue("found no cancellable tasks", foundTask);
});
} finally {
Releasables.close(releasables);
}

logger.info("--> checking that all indices segments tasks have finished");
assertBusy(() -> {
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(IndicesSegmentsAction.NAME)));
});
runTest(new Request(HttpGet.METHOD_NAME, "/_cat/segments"), IndicesSegmentsAction.NAME);
}

public static class SearcherBlockingPlugin extends Plugin implements EnginePlugin {

@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
if (BLOCK_SEARCHER_SETTING.get(indexSettings.getSettings())) {
return Optional.of(SearcherBlockingEngine::new);
}
return Optional.empty();
}

@Override
public List<Setting<?>> getSettings() {
return singletonList(BLOCK_SEARCHER_SETTING);
}
}

private static class SearcherBlockingEngine extends ReadOnlyEngine {

final Semaphore searcherBlock = new Semaphore(1);

SearcherBlockingEngine(EngineConfig config) {
super(config, null, new TranslogStats(), true, Function.identity(), true);
}

@Override
public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper) throws EngineException {
try {
searcherBlock.acquire();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
searcherBlock.release();
return super.acquireSearcher(source, scope, wrapper);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http;

import org.apache.http.client.methods.HttpGet;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.client.Request;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.common.settings.Settings;

public class IndicesStatsRestCancellationIT extends BlockedSearcherRestCancellationTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
// disable internal cluster info service to avoid internal indices stats calls
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
.build();
}

public void testIndicesStatsRestCancellation() throws Exception {
runTest(new Request(HttpGet.METHOD_NAME, "/_stats"), IndicesStatsAction.NAME);
}
}
Loading

0 comments on commit bbfa047

Please sign in to comment.