Skip to content

Commit

Permalink
Make GET _cluster/stats cancellable (elastic#68676)
Browse files Browse the repository at this point in the history
Today `GET _cluster/stats` can be quite expensive, and is typically
retrieved periodically by monitoring systems (e.g. Metricbeat) that
implement a client-side timeout. When the client times out it closes the
HTTP connection in use. With this commit we react to the close of the
HTTP connection by cancelling the ongoing stats request, avoiding
unnecessary duplicated work.

Relates elastic#55550
  • Loading branch information
DaveCTurner committed Feb 10, 2021
1 parent b97a00e commit 9cc482b
Show file tree
Hide file tree
Showing 13 changed files with 946 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http;

import org.apache.http.client.methods.HttpGet;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ReadOnlyEngine;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.function.Function;

import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;

public class ClusterStatsRestCancellationIT extends HttpSmokeTestCase {

public static final Setting<Boolean> BLOCK_STATS_SETTING = Setting.boolSetting("index.block_stats", false, Setting.Property.IndexScope);

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), ClusterStatsRestCancellationIT.StatsBlockingPlugin.class);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
// disable internal cluster info service to avoid internal cluster stats calls
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
.build();
}

public void testClusterStateRestCancellation() throws Exception {

createIndex("test", Settings.builder().put(BLOCK_STATS_SETTING.getKey(), true).build());
ensureGreen("test");

final List<Semaphore> statsBlocks = new ArrayList<>();
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
for (final IndexService indexService : indicesService) {
for (final IndexShard indexShard : indexService) {
final Engine engine = IndexShardTestCase.getEngine(indexShard);
if (engine instanceof StatsBlockingEngine) {
statsBlocks.add(((StatsBlockingEngine) engine).statsBlock);
}
}
}
}
assertThat(statsBlocks, not(empty()));

final List<Releasable> releasables = new ArrayList<>();
try {
for (final Semaphore statsBlock : statsBlocks) {
statsBlock.acquire();
releasables.add(statsBlock::release);
}

final Request clusterStatsRequest = new Request(HttpGet.METHOD_NAME, "/_cluster/stats");

final PlainActionFuture<Void> future = new PlainActionFuture<>();
logger.info("--> sending cluster state request");
final Cancellable cancellable = getRestClient().performRequestAsync(clusterStatsRequest, new ResponseListener() {
@Override
public void onSuccess(Response response) {
future.onResponse(null);
}

@Override
public void onFailure(Exception exception) {
future.onFailure(exception);
}
});

logger.info("--> waiting for task to start");
assertBusy(() -> {
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(ClusterStatsAction.NAME)));
});

logger.info("--> waiting for at least one task to hit a block");
assertBusy(() -> assertTrue(statsBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));

logger.info("--> cancelling cluster stats request");
cancellable.cancel();
expectThrows(CancellationException.class, future::actionGet);

logger.info("--> checking that all cluster stats tasks are marked as cancelled");
assertBusy(() -> {
boolean foundTask = false;
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
if (cancellableTask.getAction().startsWith(ClusterStatsAction.NAME)) {
foundTask = true;
assertTrue(cancellableTask.isCancelled());
}
}
}
assertTrue(foundTask);
});
} finally {
Releasables.close(releasables);
}

logger.info("--> checking that all cluster stats tasks have finished");
assertBusy(() -> {
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(ClusterStatsAction.NAME)));
});
}

public static class StatsBlockingPlugin extends Plugin implements EnginePlugin {

@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
if (BLOCK_STATS_SETTING.get(indexSettings.getSettings())) {
return Optional.of(StatsBlockingEngine::new);
}
return Optional.empty();
}

@Override
public List<Setting<?>> getSettings() {
return singletonList(BLOCK_STATS_SETTING);
}
}

private static class StatsBlockingEngine extends ReadOnlyEngine {

final Semaphore statsBlock = new Semaphore(1);

StatsBlockingEngine(EngineConfig config) {
super(config, null, new TranslogStats(), true, Function.identity(), true);
}

@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
try {
statsBlock.acquire();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
statsBlock.release();
return super.getSeqNoStats(globalCheckpoint);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public final class AnalysisStats implements ToXContentFragment, Writeable {
/**
* Create {@link AnalysisStats} from the given cluster state.
*/
public static AnalysisStats of(Metadata metadata) {
public static AnalysisStats of(Metadata metadata, Runnable ensureNotCancelled) {
final Map<String, IndexFeatureStats> usedCharFilterTypes = new HashMap<>();
final Map<String, IndexFeatureStats> usedTokenizerTypes = new HashMap<>();
final Map<String, IndexFeatureStats> usedTokenFilterTypes = new HashMap<>();
Expand All @@ -52,6 +52,7 @@ public static AnalysisStats of(Metadata metadata) {
final Map<String, IndexFeatureStats> usedBuiltInAnalyzers = new HashMap<>();

for (IndexMetadata indexMetadata : metadata) {
ensureNotCancelled.run();
if (indexMetadata.isSystem()) {
// Don't include system indices in statistics about analysis,
// we care about the user's indices.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Map;

/**
* A request to get cluster level stats.
Expand All @@ -31,6 +35,11 @@ public ClusterStatsRequest(String... nodesIds) {
super(nodesIds);
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ public final class MappingStats implements ToXContentFragment, Writeable {
/**
* Create {@link MappingStats} from the given cluster state.
*/
public static MappingStats of(Metadata metadata) {
public static MappingStats of(Metadata metadata, Runnable ensureNotCancelled) {
Map<String, IndexFeatureStats> fieldTypes = new HashMap<>();
for (IndexMetadata indexMetadata : metadata) {
ensureNotCancelled.run();
if (indexMetadata.isSystem()) {
// Don't include system indices in statistics about mappings,
// we care about the user's indices.
Expand Down
Loading

0 comments on commit 9cc482b

Please sign in to comment.