Skip to content

Commit

Permalink
Ensure that segments are upload to remote store in case of local and …
Browse files Browse the repository at this point in the history
…snapshot based recovery
  • Loading branch information
gbbafna committed Oct 26, 2023
1 parent fe8b2d5 commit ad73e1a
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
assertEquals(restoreSnapshotResponse1.status(), RestStatus.ACCEPTED);
assertEquals(restoreSnapshotResponse2.status(), RestStatus.ACCEPTED);
ensureGreen(indexName1, restoredIndexName2);

assertRemoteSegmentsAndTranslogUploaded(restoredIndexName2);
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1);
assertDocsPresentInIndex(client, restoredIndexName2, numDocsInIndex2);
// indexing some new docs and validating
Expand All @@ -355,6 +357,29 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 4);
}

void assertRemoteSegmentsAndTranslogUploaded(String idx) throws IOException {
String indexUUID = client().admin().indices().prepareGetSettings(idx).get().getSetting(idx, IndexMetadata.SETTING_INDEX_UUID);

Path remoteTranslogMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/metadata");
Path remoteTranslogDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/data");
Path segmentMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/segments/metadata");
Path segmentDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/segments/data");

try (
Stream<Path> translogMetadata = Files.list(remoteTranslogMetadataPath);
Stream<Path> translogData = Files.list(remoteTranslogDataPath);
Stream<Path> segmentMetadata = Files.list(segmentMetadataPath);
Stream<Path> segmentData = Files.list(segmentDataPath);

) {
assertTrue(translogData.count() > 0);
assertTrue(translogMetadata.count() > 0);
assertTrue(segmentMetadata.count() > 0);
assertTrue(segmentData.count() > 0);
}

}

public void testRemoteRestoreIndexRestoredFromSnapshot() throws IOException, ExecutionException, InterruptedException {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);
Expand Down Expand Up @@ -395,23 +420,7 @@ public void testRemoteRestoreIndexRestoredFromSnapshot() throws IOException, Exe
ensureGreen(indexName1);
assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1);

// Make sure remote translog is empty
String indexUUID = client().admin()
.indices()
.prepareGetSettings(indexName1)
.get()
.getSetting(indexName1, IndexMetadata.SETTING_INDEX_UUID);

Path remoteTranslogMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/metadata");
Path remoteTranslogDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/data");

try (
Stream<Path> translogMetadata = Files.list(remoteTranslogMetadataPath);
Stream<Path> translogData = Files.list(remoteTranslogDataPath)
) {
assertTrue(translogData.count() > 0);
assertTrue(translogMetadata.count() > 0);
}
assertRemoteSegmentsAndTranslogUploaded(indexName1);

// Clear the local data before stopping the node. This will make sure that remote translog is empty.
IndexShard indexShard = getIndexShard(primaryNodeName(indexName1), indexName1);
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1996,6 +1996,23 @@ private RemoteSegmentStoreDirectory getRemoteDirectory() {
return ((RemoteSegmentStoreDirectory) remoteDirectory);
}

/*
Returns true iff it is able to verify that there is at least one
remote segment metadata uploaded
*/
boolean atLeastOneRemoteSync() {
assert indexSettings.isRemoteStoreEnabled();
try {
RemoteSegmentStoreDirectory directory = getRemoteDirectory();
if (directory.readLatestMetadataFile() != null) {
return true;
}
} catch (IOException e) {
logger.error("Exception while reading latest metadata");
}
return false;
}

public void preRecovery() {
final IndexShardState currentState = this.state; // single volatile read
if (currentState == IndexShardState.CLOSED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -179,6 +180,9 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) {
return this.primaryTerm != indexShard.getOperationPrimaryTerm();
}

/*
@return true if retry is needed
*/
private boolean syncSegments() {
if (isReadyForUpload() == false) {
// Following check is required to enable retry and make sure that we do not lose this refresh event
Expand Down Expand Up @@ -485,7 +489,9 @@ private void initializeRemoteDirectoryOnTermUpdate() throws IOException {
* @return true iff primaryMode is true and index shard is not in closed state.
*/
private boolean isReadyForUpload() {
boolean isReady = indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED;
boolean isReady = (indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED)
|| isLocalOrSnapshotRecovery();

if (isReady == false) {
StringBuilder sb = new StringBuilder("Skipped syncing segments with");
if (indexShard.getReplicationTracker() != null) {
Expand All @@ -502,6 +508,16 @@ private boolean isReadyForUpload() {
return isReady;
}

private boolean isLocalOrSnapshotRecovery() {
// In this case when the primary mode is false, we need to upload segments to Remote Store
// This is required in case of snapshots/shrink/ split/clone where we need to durable persist
// all segments to remote before completing the recovery to ensure durability.

return (indexShard.state() == IndexShardState.RECOVERING && indexShard.shardRouting.primary())
&& (indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
|| indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT);
}

/**
* Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events
*/
Expand Down
21 changes: 21 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,15 @@ void recoverFromLocalShards(
// just trigger a merge to do housekeeping on the
// copied segments - we will also see them in stats etc.
indexShard.getEngine().forceMerge(false, -1, false, false, false, UUIDs.randomBase64UUID());
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.atLeastOneRemoteSync() == false) {
throw new IndexShardRecoveryException(
indexShard.shardId(),
"failed to upload to remote",
new IOException("Failed to upload to remote segment store")
);
}
}
return true;
} catch (IOException ex) {
throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex);
Expand Down Expand Up @@ -418,6 +427,12 @@ void recoverFromSnapshotAndRemoteStore(
}
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.atLeastOneRemoteSync() == false) {
listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store"));
return;
}
}
indexShard.postRecovery("restore done");

listener.onResponse(true);
Expand Down Expand Up @@ -697,6 +712,12 @@ private void restore(
}
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.atLeastOneRemoteSync() == false) {
listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store"));
return;
}
}
indexShard.postRecovery("restore done");
listener.onResponse(true);
}, e -> listener.onFailure(new IndexShardRestoreFailedException(shardId, "restore failed", e)));
Expand Down

0 comments on commit ad73e1a

Please sign in to comment.