Skip to content

Commit

Permalink
Make remote seeding a low priority upload (#14096) (#14110)
Browse files Browse the repository at this point in the history
(cherry picked from commit b0513dd)

Signed-off-by: Gaurav Bafna <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 13266ac commit ea72ad3
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,8 @@ private void uploadNewSegments(
}
}

private boolean isLowPriorityUpload() {
return isLocalOrSnapshotRecovery();
boolean isLowPriorityUpload() {
return isLocalOrSnapshotRecoveryOrSeeding();
}

/**
Expand Down Expand Up @@ -550,7 +550,7 @@ private void initializeRemoteDirectoryOnTermUpdate() throws IOException {
* @return true iff the shard is a started with primary mode true or it is local or snapshot recovery.
*/
private boolean isReadyForUpload() {
boolean isReady = indexShard.isStartedPrimary() || isLocalOrSnapshotRecovery() || indexShard.shouldSeedRemoteStore();
boolean isReady = indexShard.isStartedPrimary() || isLocalOrSnapshotRecoveryOrSeeding();

if (isReady == false) {
StringBuilder sb = new StringBuilder("Skipped syncing segments with");
Expand All @@ -572,14 +572,15 @@ private boolean isReadyForUpload() {
return isReady;
}

private boolean isLocalOrSnapshotRecovery() {
boolean isLocalOrSnapshotRecoveryOrSeeding() {
// In this case when the primary mode is false, we need to upload segments to Remote Store
// This is required in case of snapshots/shrink/ split/clone where we need to durable persist
// This is required in case of remote migration seeding/snapshots/shrink/ split/clone where we need to durable persist
// all segments to remote before completing the recovery to ensure durability.
return (indexShard.state() == IndexShardState.RECOVERING && indexShard.shardRouting.primary())
&& indexShard.recoveryState() != null
&& (indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
|| indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT);
|| indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT
|| indexShard.shouldSeedRemoteStore());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.Tuple;
Expand All @@ -37,6 +38,7 @@
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.indices.DefaultRemoteStoreSettings;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -125,6 +127,52 @@ public void tearDown() throws Exception {
super.tearDown();
}

public void testIsLowPriorityUpload() throws IOException {
setup(true, 3);

// Mocking the IndexShard methods and dependent classes.
IndexShard shard = mock(IndexShard.class);
Store store = mock(Store.class);
ShardId shardId = new ShardId("index1", "_na_", 1);
ShardRouting shardRouting = mock(ShardRouting.class);
shard.shardRouting = shardRouting;
when(shard.shouldSeedRemoteStore()).thenReturn(true);
when(shard.state()).thenReturn(IndexShardState.RECOVERING);
when(shardRouting.primary()).thenReturn(true);
when(shard.shardId()).thenReturn(shardId);
when(shard.store()).thenReturn(store);
when(shard.routingEntry()).thenReturn(shardRouting);
when(shard.getThreadPool()).thenReturn(mock(ThreadPool.class));
RecoveryState recoveryState = mock(RecoveryState.class);
when(recoveryState.getRecoverySource()).thenReturn(RecoverySource.PeerRecoverySource.INSTANCE);
when(shard.recoveryState()).thenReturn(recoveryState);

// Mock the Store, Directory and RemoteSegmentStoreDirectory classes
Store remoteStore = mock(Store.class);
when(shard.remoteStore()).thenReturn(remoteStore);
RemoteDirectory remoteMetadataDirectory = mock(RemoteDirectory.class);
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(
mock(RemoteDirectory.class),
remoteMetadataDirectory,
mock(RemoteStoreLockManager.class),
mock(ThreadPool.class),
shardId
);
FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory(
new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory)
);
when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory);

RemoteStoreRefreshListener remoteStoreRefreshListener = new RemoteStoreRefreshListener(
shard,
SegmentReplicationCheckpointPublisher.EMPTY,
mock(RemoteSegmentTransferTracker.class),
DefaultRemoteStoreSettings.INSTANCE
);
assertTrue(remoteStoreRefreshListener.isLocalOrSnapshotRecoveryOrSeeding());
assertTrue(remoteStoreRefreshListener.isLowPriorityUpload());
}

public void testRemoteDirectoryInitThrowsException() throws IOException {
// Methods used in the constructor of RemoteSegmentTrackerListener have been mocked to reproduce specific exceptions
// to test the failure modes possible during construction of RemoteSegmentTrackerListener object.
Expand Down

0 comments on commit ea72ad3

Please sign in to comment.