Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry download of RemoteFSTranslog to fix transient race conditions #9565

Merged
merged 3 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ protected Settings featureFlagSettings() {
.build();
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9191")
public void testPrimaryRelocationWhileIndexing() throws Exception {
super.testPrimaryRelocationWhileIndexing();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Locale;
Expand Down Expand Up @@ -68,6 +70,7 @@ public class RemoteFsTranslog extends Translog {
private final SetOnce<Boolean> olderPrimaryCleaned = new SetOnce<>();

private static final int REMOTE_DELETION_PERMITS = 2;
private static final int DOWNLOAD_RETRIES = 2;
public static final String TRANSLOG = "translog";

// Semaphore used to allow only single remote generation to happen at a time
Expand Down Expand Up @@ -161,7 +164,28 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t
RemoteFsTranslog.download(translogTransferManager, location, logger);
}

public static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
/*
In Primary to Primary relocation , there can be concurrent upload and download of translog.
While translog files are getting downloaded by new primary, it might hence be deleted by the primary
Hence we retry if tlog/ckp files are not found .

This doesn't happen in last download , where it is ensured that older primary has stopped modifying tlog data.
*/
IOException ex = null;
for (int i = 0; i <= DOWNLOAD_RETRIES; i++) {
try {
downloadOnce(translogTransferManager, location, logger);
return;
} catch (FileNotFoundException | NoSuchFileException e) {
// continue till download retries
ex = e;
}
}
throw ex;
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
}

static private void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
logger.trace("Downloading translog files from remote");
RemoteTranslogTransferTracker statsTracker = translogTransferManager.getRemoteTranslogTransferTracker();
long prevDownloadBytesSucceeded = statsTracker.getDownloadBytesSucceeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.opensearch.index.seqno.LocalCheckpointTrackerTests;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -59,12 +61,14 @@

import java.io.Closeable;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -98,6 +102,10 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@LuceneTestCase.SuppressFileSystems("ExtrasFS")

Expand Down Expand Up @@ -1466,6 +1474,42 @@ public void testCloseIntoReader() throws IOException {
}
}

public void testDownloadWithRetries() throws IOException {
long generation = 1, primaryTerm = 1;
Path location = createTempDir();
TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation, generation, 1);
Map<String, String> generationToPrimaryTermMapper = new HashMap<>();
generationToPrimaryTermMapper.put(String.valueOf(generation), String.valueOf(primaryTerm));
translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper);

TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class);
when(mockTransfer.readMetadata()).thenReturn(translogTransferMetadata);

// Always File not found
when(mockTransfer.downloadTranslog(any(), any(), any())).thenThrow(new NoSuchFileException("File not found"));
TranslogTransferManager finalMockTransfer = mockTransfer;
assertThrows(NoSuchFileException.class, () -> RemoteFsTranslog.download(finalMockTransfer, location, logger));

// File not found in first attempt . File found in second attempt.
mockTransfer = mock(TranslogTransferManager.class);
String msg = "File not found";
Exception toThrow = randomBoolean() ? new NoSuchFileException(msg) : new FileNotFoundException(msg);
when(mockTransfer.downloadTranslog(any(), any(), any())).thenThrow(toThrow).thenReturn(true);

AtomicLong downloadCounter = new AtomicLong();
doAnswer(invocation -> {
if (downloadCounter.incrementAndGet() <= 1) {
throw new NoSuchFileException("File not found");
} else if (downloadCounter.get() == 2) {
Files.createFile(location.resolve(Translog.getCommitCheckpointFileName(generation)));
}
return true;
}).when(mockTransfer).downloadTranslog(any(), any(), any());

// no exception thrown
RemoteFsTranslog.download(mockTransfer, location, logger);
}

public class ThrowingBlobRepository extends FsRepository {
private final Environment environment;

Expand Down