Skip to content

Commit

Permalink
Improve performance of shards limits decider (#53577)
Browse files Browse the repository at this point in the history
On clusters with a large number of shards, the shards limits allocation
decider can exhibit poor performance leading to timeouts applying
cluster state updates. This occurs because for every shard, we do a loop
to count the number of shards on the node, and the number of shards for
the index of the shard. This is roughly quadratic in the number of
shards. This loop is not necessary, since we already have a O(1) method
to count the number of non-relocating shards on a node, and with this
commit we add some infrastructure to RoutingNode to make counting the
number of shards per index O(1).
  • Loading branch information
jasontedor committed Mar 19, 2020
1 parent 011db5e commit 279f9bd
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
Expand All @@ -48,6 +53,8 @@ public class RoutingNode implements Iterable<ShardRouting> {

private final LinkedHashSet<ShardRouting> relocatingShards;

private final HashMap<Index, LinkedHashSet<ShardRouting>> shardsByIndex;

public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this(nodeId, node, buildShardRoutingMap(shards));
}
Expand All @@ -58,12 +65,14 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this.shards = shards;
this.relocatingShards = new LinkedHashSet<>();
this.initializingShards = new LinkedHashSet<>();
this.shardsByIndex = new LinkedHashMap<>();
for (ShardRouting shardRouting : shards.values()) {
if (shardRouting.initializing()) {
initializingShards.add(shardRouting);
} else if (shardRouting.relocating()) {
relocatingShards.add(shardRouting);
}
shardsByIndex.computeIfAbsent(shardRouting.index(), k -> new LinkedHashSet<>()).add(shardRouting);
}
assert invariant();
}
Expand Down Expand Up @@ -128,6 +137,7 @@ void add(ShardRouting shard) {
} else if (shard.relocating()) {
relocatingShards.add(shard);
}
shardsByIndex.computeIfAbsent(shard.index(), k -> new LinkedHashSet<>()).add(shard);
assert invariant();
}

Expand All @@ -148,11 +158,16 @@ void update(ShardRouting oldShard, ShardRouting newShard) {
boolean exist = relocatingShards.remove(oldShard);
assert exist : "expected shard " + oldShard + " to exist in relocatingShards";
}
shardsByIndex.get(oldShard.index()).remove(oldShard);
if (shardsByIndex.get(oldShard.index()).isEmpty()) {
shardsByIndex.remove(oldShard.index());
}
if (newShard.initializing()) {
initializingShards.add(newShard);
} else if (newShard.relocating()) {
relocatingShards.add(newShard);
}
shardsByIndex.computeIfAbsent(newShard.index(), k -> new LinkedHashSet<>()).add(newShard);
assert invariant();
}

Expand All @@ -167,6 +182,10 @@ void remove(ShardRouting shard) {
boolean exist = relocatingShards.remove(shard);
assert exist : "expected shard " + shard + " to exist in relocatingShards";
}
shardsByIndex.get(shard.index()).remove(shard);
if (shardsByIndex.get(shard.index()).isEmpty()) {
shardsByIndex.remove(shard.index());
}
assert invariant();
}

Expand Down Expand Up @@ -269,6 +288,15 @@ public int numberOfOwningShards() {
return shards.size() - relocatingShards.size();
}

public int numberOfOwningShardsForIndex(final Index index) {
final LinkedHashSet<ShardRouting> shardRoutings = shardsByIndex.get(index);
if (shardRoutings == null) {
return 0;
} else {
return Math.toIntExact(shardRoutings.stream().filter(Predicate.not(ShardRouting::relocating)).count());
}
}

public String prettyPrint() {
StringBuilder sb = new StringBuilder();
sb.append("-----node_id[").append(nodeId).append("][").append(node == null ? "X" : "V").append("]\n");
Expand Down Expand Up @@ -316,6 +344,10 @@ private boolean invariant() {
assert relocatingShards.size() == shardRoutingsRelocating.size();
assert relocatingShards.containsAll(shardRoutingsRelocating);

final Map<Index, Set<ShardRouting>> shardRoutingsByIndex =
shards.values().stream().collect(Collectors.groupingBy(ShardRouting::index, Collectors.toSet()));
assert shardRoutingsByIndex.equals(shardsByIndex);

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,28 +109,20 @@ private Decision doDecide(ShardRouting shardRouting, RoutingNode node, RoutingAl
indexShardLimit, clusterShardLimit);
}

int indexShardCount = 0;
int nodeShardCount = 0;
for (ShardRouting nodeShard : node) {
// don't count relocating shards...
if (nodeShard.relocating()) {
continue;
}
nodeShardCount++;
if (nodeShard.index().equals(shardRouting.index())) {
indexShardCount++;
}
}
final int nodeShardCount = node.numberOfOwningShards();

if (clusterShardLimit > 0 && decider.test(nodeShardCount, clusterShardLimit)) {
return allocation.decision(Decision.NO, NAME,
"too many shards [%d] allocated to this node, cluster setting [%s=%d]",
nodeShardCount, CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), clusterShardLimit);
}
if (indexShardLimit > 0 && decider.test(indexShardCount, indexShardLimit)) {
return allocation.decision(Decision.NO, NAME,
"too many shards [%d] allocated to this node for index [%s], index setting [%s=%d]",
indexShardCount, shardRouting.getIndexName(), INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), indexShardLimit);
if (indexShardLimit > 0) {
final int indexShardCount = node.numberOfOwningShardsForIndex(shardRouting.index());
if (decider.test(indexShardCount, indexShardLimit)) {
return allocation.decision(Decision.NO, NAME,
"too many shards [%d] allocated to this node for index [%s], index setting [%s=%d]",
indexShardCount, shardRouting.getIndexName(), INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), indexShardLimit);
}
}
return allocation.decision(Decision.YES, NAME,
"the shard count [%d] for this node is under the index limit [%d] and cluster level node limit [%d]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;

Expand Down Expand Up @@ -115,4 +116,17 @@ public void testNumberOfOwningShards() {
assertThat(routingNode.numberOfOwningShards(), equalTo(2));
}

public void testNumberOfOwningShardsForIndex() {
final ShardRouting test1Shard0 =
TestShardRouting.newShardRouting("test1", 0, "node-1", false, ShardRoutingState.STARTED);
final ShardRouting test2Shard0 =
TestShardRouting.newShardRouting("test2", 0, "node-1", "node-2", false, ShardRoutingState.RELOCATING);
routingNode.add(test1Shard0);
routingNode.add(test2Shard0);
assertThat(routingNode.numberOfOwningShardsForIndex(new Index("test", IndexMetaData.INDEX_UUID_NA_VALUE)), equalTo(2));
assertThat(routingNode.numberOfOwningShardsForIndex(new Index("test1", IndexMetaData.INDEX_UUID_NA_VALUE)), equalTo(1));
assertThat(routingNode.numberOfOwningShardsForIndex(new Index("test2", IndexMetaData.INDEX_UUID_NA_VALUE)), equalTo(0));
assertThat(routingNode.numberOfOwningShardsForIndex(new Index("test3", IndexMetaData.INDEX_UUID_NA_VALUE)), equalTo(0));
}

}

0 comments on commit 279f9bd

Please sign in to comment.