Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Make RemoteStoreReplicationSource#getSegmentFiles asynchronous #10771

Merged
merged 1 commit into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ protected void cleanupRepo() {
}

protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
return setup(repoLocation, ioFailureRate, skipExceptionBlobList, maxFailure, 0);
}

protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure, int replicaCount) {
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
// repository creation can happen without failure.
Expand All @@ -128,6 +132,7 @@ protected String setup(Path repoLocation, double ioFailureRate, String skipExcep

internalCluster().startClusterManagerOnlyNode(settings.build());
String dataNodeName = internalCluster().startDataOnlyNode(settings.build());
internalCluster().startDataOnlyNodes(replicaCount, settings.build());
createIndex(INDEX_NAME);
logger.info("--> Created index={}", INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTarget;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Optional;
import java.util.Set;

/**
* This class runs tests with remote store + segRep while blocking file downloads
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationUsingRemoteStoreDisruptionIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {

@Override
public Settings indexSettings() {
return remoteStoreIndexSettings(1);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public void testCancelReplicationWhileSyncingSegments() throws Exception {
Path location = randomRepoPath().toAbsolutePath();
setup(location, 0d, "metadata", Long.MAX_VALUE, 1);

final Set<String> dataNodeNames = internalCluster().getDataNodeNames();
final String replicaNode = getNode(dataNodeNames, false);
final String primaryNode = getNode(dataNodeNames, true);

SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode);
ensureGreen(INDEX_NAME);
blockNodeOnAnySegmentFile(REPOSITORY_NAME, replicaNode);
final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
indexSingleDoc();
refresh(INDEX_NAME);
waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10));
final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId());
assertEquals(SegmentReplicationState.Stage.GET_FILES, state.getStage());
ReplicationCollection.ReplicationRef<SegmentReplicationTarget> segmentReplicationTargetReplicationRef = targetService.get(
state.getReplicationId()
);
final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get();
// close the target ref here otherwise it will hold a refcount
segmentReplicationTargetReplicationRef.close();
assertNotNull(segmentReplicationTarget);
assertTrue(segmentReplicationTarget.refCount() > 0);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
assertBusy(() -> {
assertTrue(indexShard.routingEntry().primary());
assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()));
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
});
unblockNode(REPOSITORY_NAME, replicaNode);
cleanupRepo();
}

public void testCancelReplicationWhileFetchingMetadata() throws Exception {
Path location = randomRepoPath().toAbsolutePath();
setup(location, 0d, "metadata", Long.MAX_VALUE, 1);

final Set<String> dataNodeNames = internalCluster().getDataNodeNames();
final String replicaNode = getNode(dataNodeNames, false);
final String primaryNode = getNode(dataNodeNames, true);

SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode);
ensureGreen(INDEX_NAME);
blockNodeOnAnyFiles(REPOSITORY_NAME, replicaNode);
final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
indexSingleDoc();
refresh(INDEX_NAME);
waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10));
final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId());
assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage());
ReplicationCollection.ReplicationRef<SegmentReplicationTarget> segmentReplicationTargetReplicationRef = targetService.get(
state.getReplicationId()
);
final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get();
// close the target ref here otherwise it will hold a refcount
segmentReplicationTargetReplicationRef.close();
assertNotNull(segmentReplicationTarget);
assertTrue(segmentReplicationTarget.refCount() > 0);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
assertBusy(() -> {
assertTrue(indexShard.routingEntry().primary());
assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()));
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
});
unblockNode(REPOSITORY_NAME, replicaNode);
cleanupRepo();
}

private String getNode(Set<String> dataNodeNames, boolean primary) {
assertEquals(2, dataNodeNames.size());
for (String name : dataNodeNames) {
final IndexShard indexShard = getIndexShard(name, INDEX_NAME);
if (indexShard.routingEntry().primary() == primary) {
return name;
}
}
return null;
}

private IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexService(index);
assertNotNull(indexService);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return shardId.map(indexService::getShard).orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.concurrent.UncategorizedExecutionException;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.recovery.RecoverySettings;
Expand Down Expand Up @@ -51,9 +51,16 @@ public RemoteStoreFileDownloader(ShardId shardId, ThreadPool threadPool, Recover
* @param source The remote directory to copy segment files from
* @param destination The local directory to copy segment files to
* @param toDownloadSegments The list of segment files to download
* @param listener Callback listener to be notified upon completion
*/
public void download(Directory source, Directory destination, Collection<String> toDownloadSegments) throws IOException {
downloadInternal(source, destination, null, toDownloadSegments, () -> {});
public void downloadAsync(
CancellableThreads cancellableThreads,
Directory source,
Directory destination,
Collection<String> toDownloadSegments,
ActionListener<Void> listener
) {
downloadInternal(cancellableThreads, source, destination, null, toDownloadSegments, () -> {}, listener);
}

/**
Expand All @@ -74,17 +81,37 @@ public void download(
Directory secondDestination,
Collection<String> toDownloadSegments,
Runnable onFileCompletion
) throws IOException {
downloadInternal(source, destination, secondDestination, toDownloadSegments, onFileCompletion);
) throws InterruptedException, IOException {
final CancellableThreads cancellableThreads = new CancellableThreads();
final PlainActionFuture<Void> listener = PlainActionFuture.newFuture();
downloadInternal(cancellableThreads, source, destination, secondDestination, toDownloadSegments, onFileCompletion, listener);
try {
listener.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new RuntimeException(e);
} catch (InterruptedException e) {
// If the blocking call on the PlainActionFuture itself is interrupted, then we must
// cancel the asynchronous work we were waiting on
cancellableThreads.cancel(e.getMessage());
Thread.currentThread().interrupt();
throw e;
}
}

private void downloadInternal(
CancellableThreads cancellableThreads,
Directory source,
Directory destination,
@Nullable Directory secondDestination,
Collection<String> toDownloadSegments,
Runnable onFileCompletion
) throws IOException {
Runnable onFileCompletion,
ActionListener<Void> listener
) {
final Queue<String> queue = new ConcurrentLinkedQueue<>(toDownloadSegments);
// Choose the minimum of:
// - number of files to download
Expand All @@ -95,25 +122,14 @@ private void downloadInternal(
Math.min(threadPool.info(ThreadPool.Names.REMOTE_RECOVERY).getMax(), recoverySettings.getMaxConcurrentRemoteStoreStreams())
);
logger.trace("Starting download of {} files with {} threads", queue.size(), threads);
final PlainActionFuture<Collection<Void>> listener = PlainActionFuture.newFuture();
final ActionListener<Void> allFilesListener = new GroupedActionListener<>(listener, threads);
final ActionListener<Void> allFilesListener = new GroupedActionListener<>(ActionListener.map(listener, r -> null), threads);
for (int i = 0; i < threads; i++) {
copyOneFile(source, destination, secondDestination, queue, onFileCompletion, allFilesListener);
}
try {
listener.actionGet();
} catch (UncategorizedExecutionException e) {
// Any IOException will be double-wrapped so dig it out and throw it
if (e.getCause() instanceof ExecutionException) {
if (e.getCause().getCause() instanceof IOException) {
throw (IOException) e.getCause().getCause();
}
}
throw e;
copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, allFilesListener);
}
}

private void copyOneFile(
CancellableThreads cancellableThreads,
Directory source,
Directory destination,
@Nullable Directory secondDestination,
Expand All @@ -129,18 +145,20 @@ private void copyOneFile(
threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY).submit(() -> {
logger.trace("Downloading file {}", file);
try {
destination.copyFrom(source, file, file, IOContext.DEFAULT);
onFileCompletion.run();
if (secondDestination != null) {
secondDestination.copyFrom(destination, file, file, IOContext.DEFAULT);
}
cancellableThreads.executeIO(() -> {
destination.copyFrom(source, file, file, IOContext.DEFAULT);
onFileCompletion.run();
if (secondDestination != null) {
secondDestination.copyFrom(destination, file, file, IOContext.DEFAULT);
}
});
} catch (Exception e) {
// Clear the queue to stop any future processing, report the failure, then return
queue.clear();
listener.onFailure(e);
return;
}
copyOneFile(source, destination, secondDestination, queue, onFileCompletion, listener);
copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, listener);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.util.Version;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
Expand All @@ -24,11 +25,14 @@
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

Expand All @@ -43,6 +47,7 @@ public class RemoteStoreReplicationSource implements SegmentReplicationSource {

private final IndexShard indexShard;
private final RemoteSegmentStoreDirectory remoteDirectory;
private final CancellableThreads cancellableThreads = new CancellableThreads();

public RemoteStoreReplicationSource(IndexShard indexShard) {
this.indexShard = indexShard;
Expand All @@ -61,7 +66,7 @@ public void getCheckpointMetadata(
// TODO: Need to figure out a way to pass this information for segment metadata via remote store.
try (final GatedCloseable<SegmentInfos> segmentInfosSnapshot = indexShard.getSegmentInfosSnapshot()) {
final Version version = segmentInfosSnapshot.get().getCommitLuceneVersion();
RemoteSegmentMetadata mdFile = remoteDirectory.init();
final RemoteSegmentMetadata mdFile = getRemoteSegmentMetadata();
// During initial recovery flow, the remote store might not
// have metadata as primary hasn't uploaded anything yet.
if (mdFile == null && indexShard.state().equals(IndexShardState.STARTED) == false) {
Expand Down Expand Up @@ -106,39 +111,50 @@ public void getSegmentFiles(
}
logger.debug("Downloading segment files from remote store {}", filesToFetch);

RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile();
Collection<String> directoryFiles = List.of(indexShard.store().directory().listAll());
if (remoteSegmentMetadata != null) {
try {
indexShard.store().incRef();
indexShard.remoteStore().incRef();
final Directory storeDirectory = indexShard.store().directory();
final List<String> toDownloadSegmentNames = new ArrayList<>();
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
toDownloadSegmentNames.add(file);
}
indexShard.getFileDownloader()
.download(
remoteDirectory,
new ReplicationStatsDirectoryWrapper(storeDirectory, fileProgressTracker),
toDownloadSegmentNames
);
logger.debug("Downloaded segment files from remote store {}", filesToFetch);
} finally {
indexShard.store().decRef();
indexShard.remoteStore().decRef();
if (remoteMetadataExists()) {
final Directory storeDirectory = indexShard.store().directory();
final Collection<String> directoryFiles = List.of(storeDirectory.listAll());
final List<String> toDownloadSegmentNames = new ArrayList<>();
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
toDownloadSegmentNames.add(file);
}
indexShard.getFileDownloader()
.downloadAsync(
cancellableThreads,
remoteDirectory,
new ReplicationStatsDirectoryWrapper(storeDirectory, fileProgressTracker),
toDownloadSegmentNames,
ActionListener.map(listener, r -> new GetSegmentFilesResponse(filesToFetch))
);
} else {
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
}
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
} catch (Exception e) {
} catch (IOException | RuntimeException e) {
listener.onFailure(e);
}
}

@Override
public void cancel() {
this.cancellableThreads.cancel("Canceled by target");
}

@Override
public String getDescription() {
return "RemoteStoreReplicationSource";
}

private boolean remoteMetadataExists() throws IOException {
final AtomicBoolean metadataExists = new AtomicBoolean(false);
cancellableThreads.executeIO(() -> metadataExists.set(remoteDirectory.readLatestMetadataFile() != null));
return metadataExists.get();
}

private RemoteSegmentMetadata getRemoteSegmentMetadata() throws IOException {
AtomicReference<RemoteSegmentMetadata> mdFile = new AtomicReference<>();
cancellableThreads.executeIO(() -> mdFile.set(remoteDirectory.init()));
return mdFile.get();
}
}
Loading