Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Offline calculation of total shard across all node and caching it for weight calculation inside LocalShardBalancer #14689

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class LocalShardsBalancer extends ShardsBalancer {
private final float avgPrimaryShardsPerNode;
private final BalancedShardsAllocator.NodeSorter sorter;
private final Set<RoutingNode> inEligibleTargetNode;
private int totalShardCount = 0;

public LocalShardsBalancer(
Logger logger,
Expand Down Expand Up @@ -127,8 +128,7 @@ public float avgPrimaryShardsPerNode() {
*/
@Override
public float avgShardsPerNode() {
float totalShards = nodes.values().stream().map(BalancedShardsAllocator.ModelNode::numShards).reduce(0, Integer::sum);
return totalShards / nodes.size();
return totalShardCount / nodes.size();
}

/**
Expand Down Expand Up @@ -600,13 +600,15 @@ void moveShards() {
final BalancedShardsAllocator.ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
final BalancedShardsAllocator.ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId());
sourceNode.removeShard(shardRouting);
--totalShardCount;
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(
shardRouting,
targetNode.getNodeId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
allocation.changes()
);
targetNode.addShard(relocatingShards.v2());
++totalShardCount;
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
}
Expand Down Expand Up @@ -726,6 +728,7 @@ private Map<String, BalancedShardsAllocator.ModelNode> buildModelFromAssigned()
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */
if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) {
node.addShard(shard);
++totalShardCount;
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId());
}
Expand Down Expand Up @@ -816,6 +819,7 @@ void allocateUnassigned() {
);
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes());
minNode.addShard(shard);
++totalShardCount;
if (!shard.primary()) {
// copy over the same replica shards to the secondary array so they will get allocated
// in a subsequent iteration, allowing replicas of other shards to be allocated first
Expand Down Expand Up @@ -845,6 +849,7 @@ void allocateUnassigned() {
allocation.routingTable()
);
minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize));
++totalShardCount;
} else {
if (logger.isTraceEnabled()) {
logger.trace("No Node found to assign shard [{}]", shard);
Expand Down Expand Up @@ -1012,18 +1017,21 @@ private boolean tryRelocateShard(BalancedShardsAllocator.ModelNode minNode, Bala
}
final Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
maxNode.removeShard(shard);
--totalShardCount;
long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);

if (decision.type() == Decision.Type.YES) {
/* only allocate on the cluster if we are not throttled */
logger.debug("Relocate [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId());
minNode.addShard(routingNodes.relocateShard(shard, minNode.getNodeId(), shardSize, allocation.changes()).v1());
++totalShardCount;
return true;
} else {
/* allocate on the model even if throttled */
logger.debug("Simulate relocation of [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId());
assert decision.type() == Decision.Type.THROTTLE;
minNode.addShard(shard.relocate(minNode.getNodeId(), shardSize));
++totalShardCount;
return false;
}
}
Expand Down
Loading