From 279f9bd069cdea7ac4a91b57a71fa5ef8206f587 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 18 Mar 2020 20:57:50 -0400 Subject: [PATCH] Improve performance of shards limits decider (#53577) On clusters with a large number of shards, the shards limits allocation decider can exhibit poor performance leading to timeouts applying cluster state updates. This occurs because for every shard, we do a loop to count the number of shards on the node, and the number of shards for the index of the shard. This is roughly quadratic in the number of shards. This loop is not necessary, since we already have a O(1) method to count the number of non-relocating shards on a node, and with this commit we add some infrastructure to RoutingNode to make counting the number of shards per index O(1). --- .../cluster/routing/RoutingNode.java | 32 +++++++++++++++++++ .../decider/ShardsLimitAllocationDecider.java | 24 +++++--------- .../cluster/routing/RoutingNodeTests.java | 14 ++++++++ 3 files changed, 54 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index 1494513dee94e..37a4fa2721c31 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -21,15 +21,20 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; +import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -48,6 +53,8 @@ public class RoutingNode implements Iterable { private final LinkedHashSet relocatingShards; + private final HashMap> shardsByIndex; + public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) { this(nodeId, node, buildShardRoutingMap(shards)); } @@ -58,12 +65,14 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) { this.shards = shards; this.relocatingShards = new LinkedHashSet<>(); this.initializingShards = new LinkedHashSet<>(); + this.shardsByIndex = new LinkedHashMap<>(); for (ShardRouting shardRouting : shards.values()) { if (shardRouting.initializing()) { initializingShards.add(shardRouting); } else if (shardRouting.relocating()) { relocatingShards.add(shardRouting); } + shardsByIndex.computeIfAbsent(shardRouting.index(), k -> new LinkedHashSet<>()).add(shardRouting); } assert invariant(); } @@ -128,6 +137,7 @@ void add(ShardRouting shard) { } else if (shard.relocating()) { relocatingShards.add(shard); } + shardsByIndex.computeIfAbsent(shard.index(), k -> new LinkedHashSet<>()).add(shard); assert invariant(); } @@ -148,11 +158,16 @@ void update(ShardRouting oldShard, ShardRouting newShard) { boolean exist = relocatingShards.remove(oldShard); assert exist : "expected shard " + oldShard + " to exist in relocatingShards"; } + shardsByIndex.get(oldShard.index()).remove(oldShard); + if (shardsByIndex.get(oldShard.index()).isEmpty()) { + shardsByIndex.remove(oldShard.index()); + } if (newShard.initializing()) { initializingShards.add(newShard); } else if (newShard.relocating()) { relocatingShards.add(newShard); } + shardsByIndex.computeIfAbsent(newShard.index(), k -> new LinkedHashSet<>()).add(newShard); assert invariant(); } @@ -167,6 +182,10 @@ void remove(ShardRouting shard) { boolean exist = relocatingShards.remove(shard); assert exist : "expected shard " + shard + " to exist in relocatingShards"; } + shardsByIndex.get(shard.index()).remove(shard); + if (shardsByIndex.get(shard.index()).isEmpty()) { + shardsByIndex.remove(shard.index()); + } assert invariant(); } @@ -269,6 +288,15 @@ public int numberOfOwningShards() { return shards.size() - relocatingShards.size(); } + public int numberOfOwningShardsForIndex(final Index index) { + final LinkedHashSet shardRoutings = shardsByIndex.get(index); + if (shardRoutings == null) { + return 0; + } else { + return Math.toIntExact(shardRoutings.stream().filter(Predicate.not(ShardRouting::relocating)).count()); + } + } + public String prettyPrint() { StringBuilder sb = new StringBuilder(); sb.append("-----node_id[").append(nodeId).append("][").append(node == null ? "X" : "V").append("]\n"); @@ -316,6 +344,10 @@ private boolean invariant() { assert relocatingShards.size() == shardRoutingsRelocating.size(); assert relocatingShards.containsAll(shardRoutingsRelocating); + final Map> shardRoutingsByIndex = + shards.values().stream().collect(Collectors.groupingBy(ShardRouting::index, Collectors.toSet())); + assert shardRoutingsByIndex.equals(shardsByIndex); + return true; } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java index 1c0a0c0ef0a6b..e63db7d4570aa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java @@ -109,28 +109,20 @@ private Decision doDecide(ShardRouting shardRouting, RoutingNode node, RoutingAl indexShardLimit, clusterShardLimit); } - int indexShardCount = 0; - int nodeShardCount = 0; - for (ShardRouting nodeShard : node) { - // don't count relocating shards... - if (nodeShard.relocating()) { - continue; - } - nodeShardCount++; - if (nodeShard.index().equals(shardRouting.index())) { - indexShardCount++; - } - } + final int nodeShardCount = node.numberOfOwningShards(); if (clusterShardLimit > 0 && decider.test(nodeShardCount, clusterShardLimit)) { return allocation.decision(Decision.NO, NAME, "too many shards [%d] allocated to this node, cluster setting [%s=%d]", nodeShardCount, CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), clusterShardLimit); } - if (indexShardLimit > 0 && decider.test(indexShardCount, indexShardLimit)) { - return allocation.decision(Decision.NO, NAME, - "too many shards [%d] allocated to this node for index [%s], index setting [%s=%d]", - indexShardCount, shardRouting.getIndexName(), INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), indexShardLimit); + if (indexShardLimit > 0) { + final int indexShardCount = node.numberOfOwningShardsForIndex(shardRouting.index()); + if (decider.test(indexShardCount, indexShardLimit)) { + return allocation.decision(Decision.NO, NAME, + "too many shards [%d] allocated to this node for index [%s], index setting [%s=%d]", + indexShardCount, shardRouting.getIndexName(), INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), indexShardLimit); + } } return allocation.decision(Decision.YES, NAME, "the shard count [%d] for this node is under the index limit [%d] and cluster level node limit [%d]", diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java index c88708a33f457..5cde6459e9339 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; @@ -115,4 +116,17 @@ public void testNumberOfOwningShards() { assertThat(routingNode.numberOfOwningShards(), equalTo(2)); } + public void testNumberOfOwningShardsForIndex() { + final ShardRouting test1Shard0 = + TestShardRouting.newShardRouting("test1", 0, "node-1", false, ShardRoutingState.STARTED); + final ShardRouting test2Shard0 = + TestShardRouting.newShardRouting("test2", 0, "node-1", "node-2", false, ShardRoutingState.RELOCATING); + routingNode.add(test1Shard0); + routingNode.add(test2Shard0); + assertThat(routingNode.numberOfOwningShardsForIndex(new Index("test", IndexMetaData.INDEX_UUID_NA_VALUE)), equalTo(2)); + assertThat(routingNode.numberOfOwningShardsForIndex(new Index("test1", IndexMetaData.INDEX_UUID_NA_VALUE)), equalTo(1)); + assertThat(routingNode.numberOfOwningShardsForIndex(new Index("test2", IndexMetaData.INDEX_UUID_NA_VALUE)), equalTo(0)); + assertThat(routingNode.numberOfOwningShardsForIndex(new Index("test3", IndexMetaData.INDEX_UUID_NA_VALUE)), equalTo(0)); + } + }