Skip to content

Commit

Permalink
Add restore level safeguards to prevent file cache oversubscription
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed Jul 21, 2023
1 parent 78537a2 commit f092a04
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Add server version as REST response header [#6583](https://github.com/opensearch-project/OpenSearch/issues/6583)
- Start replication checkpointTimers on primary before segments upload to remote store. ([#8221]()https://github.com/opensearch-project/OpenSearch/pull/8221)
- Add configuration for file cache size to max remote data ratio to prevent oversubscription of file cache ([#8606]()https://github.com/opensearch-project/OpenSearch/pull/8606)

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ public String snapshotUuid() {
/**
* Sets the storage type for this request.
*/
RestoreSnapshotRequest storageType(StorageType storageType) {
public RestoreSnapshotRequest storageType(StorageType storageType) {
this.storageType = storageType;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,16 @@ public ShardsIterator allShardsIncludingRelocationTargets(String[] indices) {
return allShardsSatisfyingPredicate(indices, shardRouting -> true, true);
}

/**
* All the shards on the node which match the predicate
* @param predicate condition to match
* @return iterator over shards matching the predicate
*/
public ShardsIterator allShardsSatisfyingPredicate(Predicate<ShardRouting> predicate) {
String[] indices = indicesRouting.keySet().toArray(new String[0]);
return allShardsSatisfyingPredicate(indices, predicate, false);
}

private ShardsIterator allShardsSatisfyingPredicate(
String[] indices,
Predicate<ShardRouting> predicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import static org.opensearch.cluster.routing.RoutingPool.getShardPool;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;

/**
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
Expand Down Expand Up @@ -199,8 +199,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;

if (totalNodeRemoteShardSize > DATA_TO_FILE_CACHE_SIZE_RATIO * nodeCacheSize) {
final int dataToFileCacheSizeRatio = DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(allocation.metadata().settings());
if (dataToFileCacheSizeRatio > 0 && totalNodeRemoteShardSize > dataToFileCacheSizeRatio * nodeCacheSize) {
return allocation.decision(
Decision.NO,
NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.common.settings.Setting;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.index.store.remote.utils.cache.RefCountedCache;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
Expand Down Expand Up @@ -49,8 +50,12 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {

private final CircuitBreaker circuitBreaker;

// TODO: Convert the constant into an integer setting
public static final int DATA_TO_FILE_CACHE_SIZE_RATIO = 5;
public static final Setting<Integer> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.intSetting(
"cluster.filecache.remote_data_ratio",
0,
0,
Setting.Property.NodeScope
);

public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
Expand Down
5 changes: 3 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -941,8 +941,9 @@ protected Node(
clusterModule.getAllocationService(),
metadataCreateIndexService,
metadataIndexUpgradeService,
clusterService.getClusterSettings(),
shardLimitValidator
shardLimitValidator,
indicesService,
clusterInfoService
);

final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(
Expand Down
77 changes: 68 additions & 9 deletions server/src/main/java/org/opensearch/snapshots/RestoreService.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterInfo;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.ClusterStateTaskConfig;
Expand Down Expand Up @@ -68,6 +70,7 @@
import org.opensearch.cluster.routing.RoutingChangesObserver;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardsIterator;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
Expand All @@ -86,6 +89,9 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -118,6 +124,7 @@
import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
import static org.opensearch.common.util.set.Sets.newHashSet;
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectory.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
import static org.opensearch.snapshots.SnapshotUtils.filterIndices;

/**
Expand Down Expand Up @@ -176,6 +183,10 @@ public class RestoreService implements ClusterStateApplier {

private final ClusterSettings clusterSettings;

private final IndicesService indicesService;

private final ClusterInfoService clusterInfoService;

private final ClusterManagerTaskThrottler.ThrottlingKey restoreSnapshotTaskKey;

private static final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor();
Expand All @@ -186,8 +197,9 @@ public RestoreService(
AllocationService allocationService,
MetadataCreateIndexService createIndexService,
MetadataIndexUpgradeService metadataIndexUpgradeService,
ClusterSettings clusterSettings,
ShardLimitValidator shardLimitValidator
ShardLimitValidator shardLimitValidator,
IndicesService indicesService,
ClusterInfoService clusterInfoService
) {
this.clusterService = clusterService;
this.repositoriesService = repositoriesService;
Expand All @@ -199,6 +211,8 @@ public RestoreService(
}
this.clusterSettings = clusterService.getClusterSettings();
this.shardLimitValidator = shardLimitValidator;
this.indicesService = indicesService;
this.clusterInfoService = clusterInfoService;

// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
restoreSnapshotTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.RESTORE_SNAPSHOT_KEY, true);
Expand Down Expand Up @@ -401,7 +415,6 @@ public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey(

@Override
public ClusterState execute(ClusterState currentState) {
RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
// Check if the snapshot to restore is currently being deleted
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(
SnapshotDeletionsInProgress.TYPE,
Expand All @@ -423,6 +436,7 @@ public ClusterState execute(ClusterState currentState) {
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
final Map<ShardId, RestoreInProgress.ShardRestoreStatus> shards;
Set<String> aliases = new HashSet<>();
long totalRestorableRemoteIndexesSize = 0;

if (indices.isEmpty() == false) {
// We have some indices to restore
Expand All @@ -433,17 +447,14 @@ public ClusterState execute(ClusterState currentState) {
String index = indexEntry.getValue();
boolean partial = checkPartial(index);

IndexId snapshotIndexId = repositoryData.resolveIndexId(index);
IndexMetadata snapshotIndexMetadata = updateIndexSettings(
metadata.index(index),
request.indexSettings(),
request.ignoreIndexSettings()
);
if (IndexModule.Type.REMOTE_SNAPSHOT.match(request.storageType().toString())) {
snapshotIndexMetadata = addSnapshotToIndexSettings(
snapshotIndexMetadata,
snapshot,
repositoryData.resolveIndexId(index)
);
snapshotIndexMetadata = addSnapshotToIndexSettings(snapshotIndexMetadata, snapshot, snapshotIndexId);
}
final boolean isSearchableSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(
snapshotIndexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey())
Expand All @@ -469,7 +480,7 @@ public ClusterState execute(ClusterState currentState) {
restoreUUID,
snapshot,
snapshotInfo.version(),
repositoryData.resolveIndexId(index),
snapshotIndexId,
isSearchableSnapshot,
isRemoteStoreShallowCopy,
request.getSourceRemoteStoreRepository()
Expand Down Expand Up @@ -588,6 +599,14 @@ public ClusterState execute(ClusterState currentState) {
}

for (int shard = 0; shard < snapshotIndexMetadata.getNumberOfShards(); shard++) {
if (IndexModule.Type.REMOTE_SNAPSHOT.match(request.storageType().toString())) {
IndexShardSnapshotStatus.Copy shardStatus = repository.getShardSnapshotStatus(
snapshotInfo.snapshotId(),
snapshotIndexId,
new ShardId(metadata.index(index).getIndex(), shard)
).asCopy();
totalRestorableRemoteIndexesSize += shardStatus.getTotalSize();
}
if (!ignoreShards.contains(shard)) {
shardsBuilder.put(
new ShardId(renamedIndex, shard),
Expand Down Expand Up @@ -624,6 +643,9 @@ public ClusterState execute(ClusterState currentState) {
}

checkAliasNameConflicts(indices, aliases);
if (IndexModule.Type.REMOTE_SNAPSHOT.match(request.storageType().toString())) {
validateSearchableSnapshotRestorable(totalRestorableRemoteIndexesSize);
}

Map<String, DataStream> updatedDataStreams = new HashMap<>(currentState.metadata().dataStreams());
updatedDataStreams.putAll(
Expand Down Expand Up @@ -823,6 +845,43 @@ private IndexMetadata updateIndexSettings(
return builder.settings(settingsBuilder).build();
}

private void validateSearchableSnapshotRestorable(long totalRestorableRemoteIndexesSize) {
ClusterInfo clusterInfo = clusterInfoService.getClusterInfo();
int remoteDataToFileCacheRatio = DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(clusterService.getSettings());
Map<String, FileCacheStats> nodeFileCacheStats = clusterInfo.getNodeFileCacheStats();
if (nodeFileCacheStats.isEmpty() || remoteDataToFileCacheRatio <= 0) {
return;
}

long totalNodeFileCacheSize = clusterInfo.getNodeFileCacheStats()
.values()
.stream()
.map(fileCacheStats -> fileCacheStats.getTotal().getBytes())
.mapToLong(Long::longValue)
.sum();

Predicate<ShardRouting> shardRoutingPredicate = shardRouting -> shardRouting.primary()
&& indicesService.indexService(shardRouting.index()).getIndexSettings().isRemoteSnapshot();

ShardsIterator shardsIterator = clusterService.state()
.routingTable()
.allShardsSatisfyingPredicate(shardRoutingPredicate);

long totalRestoredRemoteIndexesSize = shardsIterator.getShardRoutings()
.stream()
.map(clusterInfo::getShardSize)
.mapToLong(Long::longValue)
.sum();

if (totalRestoredRemoteIndexesSize + totalRestorableRemoteIndexesSize > remoteDataToFileCacheRatio
* totalNodeFileCacheSize) {
throw new SnapshotRestoreException(
snapshot,
"Size of the indexes to be restored exceed the file cache bounds. Increase the file cache capacity on the cluster."
);
}
}

@Override
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", snapshotId), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ private RestoreSnapshotRequest randomState(RestoreSnapshotRequest instance) {
instance.snapshotUuid(randomBoolean() ? null : randomAlphaOfLength(10));
}

instance.storageType(
randomBoolean() ? RestoreSnapshotRequest.StorageType.LOCAL : RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT
);

if (randomBoolean()) {
instance.setSourceRemoteStoreRepository(randomAlphaOfLengthBetween(5, 10));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,49 @@ public void testShardsMatchingPredicateCount() {
assertThat(clusterState.routingTable().shardsMatchingPredicateCount(predicate), is(2));
}

public void testAllShardsMatchingPredicate() {
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetadata.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(RoutingTable.builder().addAsNew(metadata.index("test1")).addAsNew(metadata.index("test2")).build())
.build();
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")))
.build();
clusterState = allocation.reroute(clusterState, "reroute");

Predicate<ShardRouting> predicate = s -> s.state() == ShardRoutingState.UNASSIGNED && s.unassignedInfo().isDelayed();
assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(predicate).size(), is(0));

// starting primaries
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
// starting replicas
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
// remove node2 and reroute
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
// make sure both replicas are marked as delayed (i.e. not reallocated)
clusterState = allocation.disassociateDeadNodes(clusterState, true, "reroute");
assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(predicate).size(), is(2));

// Verifies true against all shards on the node (active/inactive)
assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(shard -> true).size(), is(4));
// Verifies false against all shards on the node (active/inactive)
assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(shard -> false).size(), is(0));
// Verifies against all primary shards on the node
assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(ShardRouting::primary).size(), is(2));
// Verifies a predicate which tests for inactive replicas
assertThat(
clusterState.routingTable()
.allShardsSatisfyingPredicate(shardRouting -> !shardRouting.primary() && !shardRouting.active())
.size(),
is(2)
);
}

public void testActivePrimaryShardsGrouped() {
assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], true).size(), is(0));
assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], false).size(), is(0));
Expand Down
Loading

0 comments on commit f092a04

Please sign in to comment.