Skip to content

Commit

Permalink
Add remote allocator for searchable snapshots
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed Oct 22, 2022
1 parent 515f84b commit 716c9f9
Show file tree
Hide file tree
Showing 18 changed files with 1,645 additions and 7 deletions.
4 changes: 1 addition & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Apply reproducible builds configuration for OpenSearch plugins through gradle plugin ([#4746](https://github.com/opensearch-project/OpenSearch/pull/4746))
- Add groupId value propagation tests for ZIP publication task ([#4772](https://github.com/opensearch-project/OpenSearch/pull/4772))
- Add support for GeoJson Point type in GeoPoint field ([#4597](https://github.com/opensearch-project/OpenSearch/pull/4597))

- Remote shard balancer support for searchable snapshots ([#4870](https://github.com/opensearch-project/OpenSearch/pull/4870))
### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down Expand Up @@ -63,7 +63,6 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Bumps `tika` from 2.4.0 to 2.5.0 ([#4791](https://github.com/opensearch-project/OpenSearch/pull/4791))
- Exclude jettison version brought in with hadoop-minicluster. ([#4787](https://github.com/opensearch-project/OpenSearch/pull/4787))
- Bump protobuf-java to 3.21.7 in repository-gcs and repository-hdfs ([#]())

### Changed
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
- Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240))
Expand Down Expand Up @@ -103,7 +102,6 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Remove LegacyESVersion.V_7_4_ and V_7_5_ Constants ([#4704](https://github.com/opensearch-project/OpenSearch/pull/4704))
- Remove Legacy Version support from Snapshot/Restore Service ([#4728](https://github.com/opensearch-project/OpenSearch/pull/4728))
- Remove deprecated serialization logic from pipeline aggs ([#4847](https://github.com/opensearch-project/OpenSearch/pull/4847))

### Fixed
- `opensearch-service.bat start` and `opensearch-service.bat manager` failing to run ([#4289](https://github.com/opensearch-project/OpenSearch/pull/4289))
- PR reference to checkout code for changelog verifier ([#4296](https://github.com/opensearch-project/OpenSearch/pull/4296))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.ParseField;
Expand All @@ -85,6 +86,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.set.Sets;
import org.opensearch.common.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -359,6 +361,9 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings));
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
addAllocationDecider(deciders, new TargetPoolAllocationDecider());
}

clusterPlugins.stream()
.flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,23 @@ assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) ==
+ " was matched but wasn't removed";
}

public void swapPrimaryWithReplica(
Logger logger,
ShardRouting primaryShard,
ShardRouting replicaShard,
RoutingChangesObserver changes
) {
assert primaryShard.primary() : "Invalid primary shard provided";
assert !replicaShard.primary() : "Invalid Replica shard provided";

ShardRouting newPrimary = primaryShard.moveActivePrimaryToReplica();
ShardRouting newReplica = replicaShard.moveActiveReplicaToPrimary();
updateAssigned(primaryShard, newPrimary);
updateAssigned(replicaShard, newReplica);
logger.info("Swap relocation performed for shard [{}]", newPrimary.shortSummary());
changes.replicaPromoted(newPrimary);
}

private void unassignPrimaryAndPromoteActiveReplicaIfExists(
ShardRouting failedShard,
UnassignedInfo unassignedInfo,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexModule;

/**
* {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods
* help decide the capabilities of a specific node as well as an index or shard based on the index configuration.
* These methods help with allocation decisions and determining shard classification with the allocation process.
*
* @opensearch.internal
*/
public enum RoutingPool {
LOCAL_ONLY,
REMOTE_CAPABLE;

/**
* Helps to determine the appropriate {@link RoutingPool} for a given node from the {@link RoutingNode}
*/
public static RoutingPool getNodePool(RoutingNode node) {
return getNodePool(node.node());
}

/**
* Helps to determine the appropriate {@link RoutingPool} for a given node from the {@link DiscoveryNode}
*/
public static RoutingPool getNodePool(DiscoveryNode node) {
if (node.isSearchNode()) {
return REMOTE_CAPABLE;
}
return LOCAL_ONLY;
}

/**
* Can determine the appropriate {@link RoutingPool} for a given shard using the {@link IndexMetadata} for the
* index using the {@link RoutingAllocation}.
* @param shard the shard routing for which {@link RoutingPool} has to be determined.
* @param allocation the current allocation of the cluster
* @return {@link RoutingPool} for the given shard.
*/
public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation allocation) {
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shard.index());
return getIndexPool(indexMetadata);
}

/**
* Can determine the appropriate {@link RoutingPool} for a given index using the {@link IndexMetadata}.
* @param indexMetadata the index metadata object for which {@link RoutingPool} has to be determined.
* @return {@link RoutingPool} for the given index.
*/
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
Settings indexSettings = indexMetadata.getSettings();
if (IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey().equals(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
return REMOTE_CAPABLE;
}
return LOCAL_ONLY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,29 @@ public ShardRouting moveToStarted() {
);
}

/**
* Make the active primary shard as replica
*
* @throws IllegalShardRoutingStateException if shard is already a replica
*/
public ShardRouting moveActivePrimaryToReplica() {
assert active() : "expected an active shard " + this;
if (!primary) {
throw new IllegalShardRoutingStateException(this, "Not a primary shard, can't move to replica");
}
return new ShardRouting(
shardId,
currentNodeId,
relocatingNodeId,
false,
state,
recoverySource,
unassignedInfo,
allocationId,
expectedShardSize
);
}

/**
* Make the active shard primary unless it's not primary
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -145,6 +146,13 @@ public void allocate(RoutingAllocation allocation) {
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
localShardsBalancer.balance();

if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation);
remoteShardsBalancer.allocateUnassigned();
remoteShardsBalancer.moveShards();
remoteShardsBalancer.balance();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingPool;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
Expand All @@ -27,9 +28,11 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.gateway.PriorityComparator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -38,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING;
Expand Down Expand Up @@ -102,6 +106,10 @@ public float avgShardsPerNode(String index) {
*/
@Override
public float avgShardsPerNode() {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
float totalShards = nodes.values().stream().map(BalancedShardsAllocator.ModelNode::numShards).reduce(0, Integer::sum);
return totalShards / nodes.size();
}
return avgShardsPerNode;
}

Expand Down Expand Up @@ -172,6 +180,11 @@ void balance() {
*/
@Override
MoveDecision decideRebalance(final ShardRouting shard) {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)
&& RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) {
return MoveDecision.NOT_TAKEN;
}

if (shard.started() == false) {
// we can only rebalance started shards
return MoveDecision.NOT_TAKEN;
Expand Down Expand Up @@ -441,7 +454,19 @@ private void balanceByWeights() {
* to the nodes we relocated them from.
*/
private String[] buildWeightOrderedIndices() {
final String[] indices = allocation.routingTable().indicesRouting().keys().toArray(String.class);
final String[] indices;
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
final List<String> localIndices = new ArrayList<>();
for (String index : allocation.routingTable().indicesRouting().keys().toArray(String.class)) {
if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getIndexPool(metadata.index(index)))) {
localIndices.add(index);
}
}
indices = localIndices.toArray(new String[0]);
} else {
indices = allocation.routingTable().indicesRouting().keys().toArray(String.class);
}

final float[] deltas = new float[indices.length];
for (int i = 0; i < deltas.length; i++) {
sorter.reset(indices[i]);
Expand Down Expand Up @@ -507,7 +532,7 @@ void moveShards() {
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
// offloading the shards.

// Trying to eliminate target nodes so that we donot unnecessarily iterate over source nodes
// Trying to eliminate target nodes so that we do not unnecessarily iterate over source nodes
// when no target is eligible
for (BalancedShardsAllocator.ModelNode currentNode : sorter.modelNodes) {
checkAndAddInEligibleTargetNode(currentNode.getRoutingNode());
Expand All @@ -533,6 +558,11 @@ void moveShards() {

ShardRouting shardRouting = it.next();

if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)
&& RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
continue;
}

// Ensure that replicas don't relocate if primaries are being throttled and primary first is enabled
if (movePrimaryFirst && primariesThrottled && !shardRouting.primary()) {
logger.info(
Expand Down Expand Up @@ -593,6 +623,11 @@ void moveShards() {
*/
@Override
MoveDecision decideMove(final ShardRouting shardRouting) {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)
&& RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
return MoveDecision.NOT_TAKEN;
}

if (shardRouting.started() == false) {
// we can only move started shards
return MoveDecision.NOT_TAKEN;
Expand Down Expand Up @@ -680,7 +715,7 @@ private Map<String, BalancedShardsAllocator.ModelNode> buildModelFromAssigned()
for (ShardRouting shard : rn) {
assert rn.nodeId().equals(shard.currentNodeId());
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */
if (shard.state() != RELOCATING) {
if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) {
node.addShard(shard);
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId());
Expand Down Expand Up @@ -735,7 +770,17 @@ void allocateUnassigned() {
* if we allocate for instance (0, R, IDX1) we move the second replica to the secondary array and proceed with
* the next replica. If we could not find a node to allocate (0,R,IDX1) we move all it's replicas to ignoreUnassigned.
*/
ShardRouting[] primary = unassigned.drain();
ShardRouting[] unassignedShards = unassigned.drain();
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
List<ShardRouting> allUnassignedShards = Arrays.stream(unassignedShards).collect(Collectors.toList());
List<ShardRouting> localUnassignedShards = allUnassignedShards.stream()
.filter(shard -> RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)))
.collect(Collectors.toList());
allUnassignedShards.removeAll(localUnassignedShards);
allUnassignedShards.forEach(shard -> routingNodes.unassigned().add(shard));
unassignedShards = localUnassignedShards.toArray(new ShardRouting[localUnassignedShards.size()]);
}
ShardRouting[] primary = unassignedShards;
ShardRouting[] secondary = new ShardRouting[primary.length];
int secondaryLength = 0;
int primaryLength = primary.length;
Expand Down
Loading

0 comments on commit 716c9f9

Please sign in to comment.