Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed Aug 2, 2023
1 parent cf4b3cc commit f2eb1fa
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 173 deletions.
175 changes: 73 additions & 102 deletions server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.core.Assertions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -1311,131 +1310,103 @@ private void ensureMutable() {
}

/**
* Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from
* the first node, then the first shard of the second node, etc. until one shard from each node has been returned.
* The iterator then resumes on the first node by returning the second shard and continues until all shards from
* all the nodes have been returned.
* @param shardMovementStrategy if ShardMovementStrategy.PRIMARY_FIRST, all primary shards are iterated over before iterating replica for any node
* if ShardMovementStrategy.REPLICA_FIRST, all replica shards are iterated over before iterating primary for any node
* if ShardMovementStrategy.NO_PREFERENCE, order of replica and primary shards doesn't matter in iteration
* @return iterator of shard routings
* Returns iterator of shard routings used by {@link #nodeInterleavedShardIterator(ShardMovementStrategy)}
* @param primaryFirst true when ShardMovementStrategy = ShardMovementStrategy.PRIMARY_FIRST, false when it is ShardMovementStrategy.REPLICA_FIRST
*/
public Iterator<ShardRouting> nodeInterleavedShardIterator(BalancedShardsAllocator.ShardMovementStrategy shardMovementStrategy) {
private Iterator<ShardRouting> buildIteratorForMovementStrategy(boolean primaryFirst) {
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
queue.add(entry.getValue().copyShards().iterator());
}
if (shardMovementStrategy == BalancedShardsAllocator.ShardMovementStrategy.PRIMARY_FIRST) {
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> replicaShards = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> replicaIterators = new ArrayDeque<>();

public boolean hasNext() {
while (queue.isEmpty() == false) {
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
if (!replicaShards.isEmpty()) {
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> shardRoutings = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> shardIterators = new ArrayDeque<>();

public boolean hasNext() {
while (queue.isEmpty() == false) {
if (queue.peek().hasNext()) {
return true;
}
while (!replicaIterators.isEmpty()) {
if (replicaIterators.peek().hasNext()) {
return true;
}
replicaIterators.poll();
queue.poll();
}
if (!shardRoutings.isEmpty()) {
return true;
}
while (!shardIterators.isEmpty()) {
if (shardIterators.peek().hasNext()) {
return true;
}
return false;
shardIterators.poll();
}
return false;
}

public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
if (primaryFirst) {
if (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary()) {
queue.offer(iter);
return result;
}
replicaShards.offer(result);
replicaIterators.offer(iter);
shardRoutings.offer(result);
shardIterators.offer(iter);
}
} else {
while (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary() == false) {
queue.offer(iter);
return result;
}
shardRoutings.offer(result);
shardIterators.offer(iter);
}
}
if (!replicaShards.isEmpty()) {
return replicaShards.poll();
}
Iterator<ShardRouting> replicaIterator = replicaIterators.poll();
ShardRouting replicaShard = replicaIterator.next();
replicaIterators.offer(replicaIterator);

assert !replicaShard.primary();
return replicaShard;
}

public void remove() {
throw new UnsupportedOperationException();
if (!shardRoutings.isEmpty()) {
return shardRoutings.poll();
}
};
} else {
if (shardMovementStrategy == BalancedShardsAllocator.ShardMovementStrategy.REPLICA_FIRST) {
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> primaryShards = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> primaryIterators = new ArrayDeque<>();
Iterator<ShardRouting> replicaIterator = shardIterators.poll();
ShardRouting replicaShard = replicaIterator.next();
shardIterators.offer(replicaIterator);

public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
if (!primaryShards.isEmpty()) {
return true;
}
while (!primaryIterators.isEmpty()) {
if (primaryIterators.peek().hasNext()) {
return true;
}
primaryIterators.poll();
}
return false;
}
assert replicaShard.primary() != primaryFirst;
return replicaShard;
}

public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
while (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary() == false) {
queue.offer(iter);
return result;
}
primaryShards.offer(result);
primaryIterators.offer(iter);
}
}
if (primaryShards.isEmpty() == false) {
return primaryShards.poll();
}
Iterator<ShardRouting> primaryIterator = primaryIterators.poll();
ShardRouting primaryShard = primaryIterator.next();
primaryIterators.offer(primaryIterator);
public void remove() {
throw new UnsupportedOperationException();
}

assert primaryShard.primary();
return primaryShard;
}
};
}

public void remove() {
throw new UnsupportedOperationException();
}
};
/**
* Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from
* the first node, then the first shard of the second node, etc. until one shard from each node has been returned.
* The iterator then resumes on the first node by returning the second shard and continues until all shards from
* all the nodes have been returned.
* @param shardMovementStrategy if ShardMovementStrategy.PRIMARY_FIRST, all primary shards are iterated over before iterating replica for any node
* if ShardMovementStrategy.REPLICA_FIRST, all replica shards are iterated over before iterating primary for any node
* if ShardMovementStrategy.NO_PREFERENCE, order of replica and primary shards doesn't matter in iteration
* @return iterator of shard routings
*/
public Iterator<ShardRouting> nodeInterleavedShardIterator(ShardMovementStrategy shardMovementStrategy) {
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
queue.add(entry.getValue().copyShards().iterator());
}
if (shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST) {
return buildIteratorForMovementStrategy(true);
} else {
if (shardMovementStrategy == ShardMovementStrategy.REPLICA_FIRST) {
return buildIteratorForMovementStrategy(false);
} else {
return new Iterator<ShardRouting>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;

import java.util.Locale;

/**
* ShardMovementStrategy defines the order in which shard movement occurs.
*
* ShardMovementStrategy values or rather their string representation to be used with
* {@link BalancedShardsAllocator#SHARD_MOVEMENT_STRATEGY_SETTING} via cluster settings.
*/
public enum ShardMovementStrategy {
/**
* default behavior in which order of shard movement doesn't matter.
*/
NO_PREFERENCE,

/**
* primary shards are moved first
*/
PRIMARY_FIRST,

/**
* replica shards are moved first
*/
REPLICA_FIRST;

public static ShardMovementStrategy parse(String strValue) {
if (strValue == null) {
return null;
} else {
strValue = strValue.toUpperCase(Locale.ROOT);
try {
return ShardMovementStrategy.valueOf(strValue);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Illegal allocation.shard_movement_strategy value [" + strValue + "]");
}
}
}

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.lucene.util.IntroSorter;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardMovementStrategy;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus;
Expand All @@ -56,7 +57,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -112,43 +112,6 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.Deprecated
);

/**
* ShardMovementStrategy defines the order in which shard movement occurs.
* <p>
* Allocation settings can have the following values (non-casesensitive):
* <ul>
* <li> <code>NO_PREFERENCE</code> - default behavior in which order of shard movement doesn't matter.
* <li> <code>PRIMARY_FIRST</code> - primary shards are moved first.
* <li> <code>REPLICA_FIRST</code> - replica shards are moved first.
* </ul>
* ShardMovementStrategy values or rather their string representation to be used with
* {@link BalancedShardsAllocator#SHARD_MOVEMENT_STRATEGY_SETTING} via cluster settings.
*/
public enum ShardMovementStrategy {
NO_PREFERENCE,
PRIMARY_FIRST,
REPLICA_FIRST;

public static ShardMovementStrategy parse(String strValue) {
if (strValue == null) {
return null;
} else {
strValue = strValue.toUpperCase(Locale.ROOT);
try {
return ShardMovementStrategy.valueOf(strValue);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Illegal allocation.shard_movement_strategy value [" + strValue + "]");
}
}
}

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}

}

/**
* Decides order in which to move shards from node when shards can not stay on node anymore. {@link LocalShardsBalancer#moveShards()}
* Encapsulates behavior of above SHARD_MOVE_PRIMARY_FIRST_SETTING.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingPool;
import org.opensearch.cluster.routing.ShardMovementStrategy;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
Expand Down Expand Up @@ -58,7 +59,7 @@ public class LocalShardsBalancer extends ShardsBalancer {
private final RoutingAllocation allocation;
private final RoutingNodes routingNodes;
private final boolean movePrimaryFirst;
private final BalancedShardsAllocator.ShardMovementStrategy shardMovementStrategy;
private final ShardMovementStrategy shardMovementStrategy;

private final boolean preferPrimaryBalance;
private final BalancedShardsAllocator.WeightFunction weight;
Expand All @@ -75,7 +76,7 @@ public LocalShardsBalancer(
Logger logger,
RoutingAllocation allocation,
boolean movePrimaryFirst,
BalancedShardsAllocator.ShardMovementStrategy shardMovementStrategy,
ShardMovementStrategy shardMovementStrategy,
BalancedShardsAllocator.WeightFunction weight,
float threshold,
boolean preferPrimaryBalance
Expand Down Expand Up @@ -536,14 +537,14 @@ private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) {
* In the event of changing ShardMovementStrategy setting from default setting NO_PREFERENCE to either PRIMARY_FIRST or REPLICA_FIRST, we want that
* to have priority over values set in move_primary_first setting.
*/
private BalancedShardsAllocator.ShardMovementStrategy getShardMovementStrategy() {
if (shardMovementStrategy != BalancedShardsAllocator.ShardMovementStrategy.NO_PREFERENCE) {
private ShardMovementStrategy getShardMovementStrategy() {
if (shardMovementStrategy != ShardMovementStrategy.NO_PREFERENCE) {
return shardMovementStrategy;
}
if (movePrimaryFirst) {
return BalancedShardsAllocator.ShardMovementStrategy.PRIMARY_FIRST;
return ShardMovementStrategy.PRIMARY_FIRST;
}
return BalancedShardsAllocator.ShardMovementStrategy.NO_PREFERENCE;
return ShardMovementStrategy.NO_PREFERENCE;
}

/**
Expand Down Expand Up @@ -594,9 +595,7 @@ void moveShards() {
}

// Ensure that replicas don't relocate if primaries are being throttled and primary first shard movement strategy is enabled
if ((shardMovementStrategy == BalancedShardsAllocator.ShardMovementStrategy.PRIMARY_FIRST)
&& primariesThrottled
&& !shardRouting.primary()) {
if ((shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST) && primariesThrottled && !shardRouting.primary()) {
logger.info(
"Cannot move any replica shard in the cluster as movePrimaryFirst is enabled and primary shards"
+ "are being throttled. Skipping shard iteration"
Expand Down
Loading

0 comments on commit f2eb1fa

Please sign in to comment.