Skip to content

Commit

Permalink
Retry download of RemoteFSTranslog to fix transient race conditions
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Aug 28, 2023
1 parent a1d8beb commit 009843f
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ protected Settings featureFlagSettings() {
.build();
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9191")
public void testPrimaryRelocationWhileIndexing() throws Exception {
super.testPrimaryRelocationWhileIndexing();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Locale;
Expand Down Expand Up @@ -66,6 +68,7 @@ public class RemoteFsTranslog extends Translog {
private final SetOnce<Boolean> olderPrimaryCleaned = new SetOnce<>();

private static final int REMOTE_DELETION_PERMITS = 2;
private static final int DOWNLOAD_RETRIES = 2;
public static final String TRANSLOG = "translog";

// Semaphore used to allow only single remote generation to happen at a time
Expand Down Expand Up @@ -142,7 +145,28 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t
RemoteFsTranslog.download(translogTransferManager, location, logger);
}

public static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
/*
In Primary to Primary relocation , there can be concurrent upload and download of translog.
While translog files are getting downloaded by new primary, it might hence be deleted by the primary
Hence we retry if tlog/ckp files are not found .
This doesn't happen in last download , where it is ensured that older primary has stopped modifying tlog data.
*/
IOException ex = null;
for (int i = 0; i <= DOWNLOAD_RETRIES; i++) {
try {
downloadOnce(translogTransferManager, location, logger);
return;
} catch (FileNotFoundException | NoSuchFileException e) {
// continue till download retries
ex = e;
}
}
throw ex;
}

static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
logger.trace("Downloading translog files from remote");
TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
if (translogMetadata != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.opensearch.index.seqno.LocalCheckpointTrackerTests;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -64,6 +66,7 @@
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -97,6 +100,10 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@LuceneTestCase.SuppressFileSystems("ExtrasFS")

Expand Down Expand Up @@ -1384,6 +1391,40 @@ public void testCloseIntoReader() throws IOException {
}
}

public void testDownloadWithRetries() throws IOException {
long generation = 1, primaryTerm = 1;
Path location = createTempDir();
TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation, generation, 1);
Map<String, String> generationToPrimaryTermMapper = new HashMap<>();
generationToPrimaryTermMapper.put(String.valueOf(generation), String.valueOf(primaryTerm));
translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper);

TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class);
when(mockTransfer.readMetadata()).thenReturn(translogTransferMetadata);

// Always File not found
when(mockTransfer.downloadTranslog(any(), any(), any())).thenThrow(new NoSuchFileException("File not found"));
TranslogTransferManager finalMockTransfer = mockTransfer;
assertThrows(NoSuchFileException.class, () -> RemoteFsTranslog.download(finalMockTransfer, location, logger));

// File not found in first attempt . File found in second attempt.
mockTransfer = mock(TranslogTransferManager.class);
when(mockTransfer.downloadTranslog(any(), any(), any())).thenThrow(new NoSuchFileException("File not found")).thenReturn(true);

AtomicLong downloadCounter = new AtomicLong();
doAnswer(invocation -> {
if (downloadCounter.incrementAndGet() <= 1) {
throw new NoSuchFileException("File not found");
} else if (downloadCounter.get() == 2) {
Files.createFile(location.resolve(Translog.getCommitCheckpointFileName(generation)));
}
return true;
}).when(mockTransfer).downloadTranslog(any(), any(), any());

// no exception thrown
RemoteFsTranslog.download(mockTransfer, location, logger);
}

public class ThrowingBlobRepository extends FsRepository {
private final Environment environment;

Expand Down

0 comments on commit 009843f

Please sign in to comment.