Skip to content

Commit

Permalink
Hide orphaned tasks from follower stats (#48901)
Browse files Browse the repository at this point in the history
CCR follower stats can return information for persistent tasks that are in the process of being cleaned up. This is problematic for tests where CCR follower indices have been deleted, but their persistent follower task is only cleaned up asynchronously afterwards. If one of the following tests then accesses the follower stats, it might still get the stats for that follower task.

In addition, some tests were not cleaning up their auto-follow patterns, leaving orphaned patterns behind. Other tests cleaned up their auto-follow patterns. As always the same name was used, it just depended on the test execution order whether this led to a failure or not. This commit fixes the offensive tests, and will also automatically remove auto-follow-patterns at the end of tests, like we do for many other features.

Closes #48700
ywelsch authored Nov 8, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 3b48454 commit be849b2
Showing 6 changed files with 93 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -63,6 +63,13 @@ PUT /_ccr/auto_follow/my_auto_follow_pattern
// TEST[setup:remote_cluster]
// TESTSETUP
[source,console]
--------------------------------------------------
DELETE /_ccr/auto_follow/my_auto_follow_pattern
--------------------------------------------------
// TEST
// TEARDOWN
//////////////////////////

[source,console]
Original file line number Diff line number Diff line change
@@ -59,6 +59,13 @@ PUT /_ccr/auto_follow/my_auto_follow_pattern
// TEST[setup:remote_cluster]
// TESTSETUP
[source,console]
--------------------------------------------------
DELETE /_ccr/auto_follow/my_auto_follow_pattern
--------------------------------------------------
// TEST
// TEARDOWN
[source,console]
--------------------------------------------------
POST /_ccr/auto_follow/my_auto_follow_pattern/pause
Original file line number Diff line number Diff line change
@@ -469,6 +469,15 @@ protected boolean preserveILMPoliciesUponCompletion() {
return false;
}

/**
* Returns whether to preserve auto-follow patterns. Defaults to not
* preserving them. Only runs at all if xpack is installed on the cluster
* being tested.
*/
protected boolean preserveAutoFollowPatternsUponCompletion() {
return false;
}

/**
* Returns whether to preserve SLM Policies of this test. Defaults to not
* preserving them. Only runs at all if xpack is installed on the cluster
@@ -560,6 +569,10 @@ private void wipeCluster() throws Exception {
deleteAllILMPolicies();
}

if (hasXPack && false == preserveAutoFollowPatternsUponCompletion()) {
deleteAllAutoFollowPatterns();
}

assertThat("Found in progress snapshots [" + inProgressSnapshots.get() + "].", inProgressSnapshots.get(), anEmptyMap());
}

@@ -736,6 +749,31 @@ private static void deleteAllSLMPolicies() throws IOException {
}
}

private static void deleteAllAutoFollowPatterns() throws IOException {
final List<Map<?, ?>> patterns;

try {
Response response = adminClient().performRequest(new Request("GET", "/_ccr/auto_follow"));
patterns = (List<Map<?, ?>>) entityAsMap(response).get("patterns");
} catch (ResponseException e) {
if (RestStatus.METHOD_NOT_ALLOWED.getStatus() == e.getResponse().getStatusLine().getStatusCode() ||
RestStatus.BAD_REQUEST.getStatus() == e.getResponse().getStatusLine().getStatusCode()) {
// If bad request returned, CCR is not enabled.
return;
}
throw e;
}

if (patterns == null || patterns.isEmpty()) {
return;
}

for (Map<?, ?> pattern : patterns) {
String patternName = (String) pattern.get("name");
adminClient().performRequest(new Request("DELETE", "/_ccr/auto_follow/" + patternName));
}
}

/**
* Logs a message if there are still running tasks. The reasoning is that any tasks still running are state the is trying to bleed into
* other tests.
Original file line number Diff line number Diff line change
@@ -13,9 +13,11 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.Index;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
@@ -116,15 +118,17 @@ static Set<String> findFollowerIndicesFromShardFollowTasks(ClusterState state, S
if (persistentTasksMetaData == null) {
return Collections.emptySet();
}

final MetaData metaData = state.metaData();
final Set<String> requestedFollowerIndices = indices != null ?
new HashSet<>(Arrays.asList(indices)) : Collections.emptySet();
return persistentTasksMetaData.tasks().stream()
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
.map(persistentTask -> {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
return shardFollowTask.getFollowShardId().getIndexName();
return shardFollowTask.getFollowShardId().getIndex();
})
.filter(followerIndex -> metaData.index(followerIndex) != null) // hide tasks that are orphaned (see ShardFollowTaskCleaner)
.map(Index::getName)
.filter(followerIndex -> Strings.isAllOrWildcard(indices) || requestedFollowerIndices.contains(followerIndex))
.collect(Collectors.toSet());
}
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.index.Index;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.Ccr;
@@ -58,7 +59,8 @@ private static ClusterState createCS(String[] indices, boolean[] followerIndices
if (isFollowIndex) {
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());
if (active) {
persistentTasks.addTask(Integer.toString(i), ShardFollowTask.NAME, createShardFollowTask(index), null);
persistentTasks.addTask(Integer.toString(i), ShardFollowTask.NAME,
createShardFollowTask(new Index(index, IndexMetaData.INDEX_UUID_NA_VALUE)), null);
}
}
mdBuilder.put(imdBuilder);
Original file line number Diff line number Diff line change
@@ -5,12 +5,16 @@
*/
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;
@@ -24,30 +28,48 @@
public class TransportFollowStatsActionTests extends ESTestCase {

public void testFindFollowerIndicesFromShardFollowTasks() {
Settings indexSettings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build();

IndexMetaData index1 = IndexMetaData.builder("index1").settings(indexSettings).build();
IndexMetaData index2 = IndexMetaData.builder("index2").settings(indexSettings).build();
IndexMetaData index3 = IndexMetaData.builder("index3").settings(indexSettings).build();

PersistentTasksCustomMetaData.Builder persistentTasks = PersistentTasksCustomMetaData.builder()
.addTask("1", ShardFollowTask.NAME, createShardFollowTask("abc"), null)
.addTask("2", ShardFollowTask.NAME, createShardFollowTask("def"), null);
.addTask("1", ShardFollowTask.NAME, createShardFollowTask(index1.getIndex()), null)
.addTask("2", ShardFollowTask.NAME, createShardFollowTask(index2.getIndex()), null)
.addTask("3", ShardFollowTask.NAME, createShardFollowTask(index3.getIndex()), null);

ClusterState clusterState = ClusterState.builder(new ClusterName("_cluster"))
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, persistentTasks.build()).build())
.metaData(MetaData.builder()
.putCustom(PersistentTasksCustomMetaData.TYPE, persistentTasks.build())
// only add index1 and index2
.put(index1, false)
.put(index2, false)
.build())
.build();
Set<String> result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, null);
assertThat(result.size(), equalTo(2));
assertThat(result.contains("abc"), is(true));
assertThat(result.contains("def"), is(true));
assertThat(result.contains(index1.getIndex().getName()), is(true));
assertThat(result.contains(index2.getIndex().getName()), is(true));

result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, new String[]{"def"});
result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState,
new String[]{index2.getIndex().getName()});
assertThat(result.size(), equalTo(1));
assertThat(result.contains("def"), is(true));
assertThat(result.contains(index2.getIndex().getName()), is(true));

result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, new String[]{"ghi"});
result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState,
new String[]{index3.getIndex().getName()});
assertThat(result.size(), equalTo(0));
}

static ShardFollowTask createShardFollowTask(String followerIndex) {
static ShardFollowTask createShardFollowTask(Index followerIndex) {
return new ShardFollowTask(
null,
new ShardId(followerIndex, "", 0),
new ShardId(followerIndex, 0),
new ShardId("leader_index", "", 0),
1024,
1024,

0 comments on commit be849b2

Please sign in to comment.