Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make prepare engine step of recovery source non-blocking #37573

Merged
merged 6 commits into from
Jan 22, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,13 @@ public interface RecoveryListener {
class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {

@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel,
Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps());
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener =
new HandledTransportAction.ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated but I think we can move ChannelActionListener into it's own class

recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps(),
ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure));
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,51 +197,48 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than ["
+ startingSeqNo + "]";

final TimeValue prepareEngineTime;
try {
// For a sequence based recovery, the target can keep its local translog
prepareEngineTime = prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
prepareEngineStep.whenComplete(prepareEngineTime -> {
/*
* add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
* This means that any document indexed into the primary after this will be replicated to this replica as well
* make sure to do this before sampling the max sequence number in the next step, to ensure that we send
* all documents up to maxSeqNo in phase2.
*/
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);

/*
* add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
* This means that any document indexed into the primary after this will be replicated to this replica as well
* make sure to do this before sampling the max sequence number in the next step, to ensure that we send
* all documents up to maxSeqNo in phase2.
*/
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);

final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));

if (logger.isTraceEnabled()) {
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);
logger.trace("snapshot translog for recovery; current size is [{}]",
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
}
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
if (logger.isTraceEnabled()) {
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);
logger.trace("snapshot translog for recovery; current size is [{}]",
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
}
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
resources.add(phase2Snapshot);
// we can release the retention lock here because the snapshot itself will retain the required operations.
IOUtils.close(retentionLock);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep);
sendSnapshotStep.whenComplete(
r -> IOUtils.close(phase2Snapshot),
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e)));

}, onFailure);

final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
resources.add(phase2Snapshot);
// we can release the retention lock here because the snapshot itself will retain the required operations.
IOUtils.close(retentionLock);
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep);
sendSnapshotStep.whenComplete(
r -> IOUtils.close(phase2Snapshot),
e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e)));
final StepListener<Void> finalizeStep = new StepListener<>();
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);

Expand All @@ -251,7 +248,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
prepareEngineTime.millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
try {
wrappedListener.onResponse(response);
} finally {
Expand Down Expand Up @@ -484,16 +481,23 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer>
}
}

TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException {
void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<TimeValue> listener) {
StopWatch stopWatch = new StopWatch().start();
logger.trace("recovery [phase1]: prepare remote engine for translog");
final ActionListener<Void> wrappedListener = ActionListener.wrap(
nullVal -> {
stopWatch.stop();
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime);
listener.onResponse(tookTime);
},
e -> {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
listener.onFailure(new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e));
});
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps));
stopWatch.stop();
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime);
return tookTime;
logger.trace("recovery [phase1]: prepare remote engine for translog");
cancellableThreads.execute(() ->
recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, wrappedListener));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,12 @@ private void ensureRefCount() {
/*** Implementation of {@link RecoveryTargetHandler } */

@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
indexShard().openEngineAndSkipTranslogRecovery();
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
state().getTranslog().totalOperations(totalTranslogOps);
indexShard().openEngineAndSkipTranslogRecovery();
return null;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface RecoveryTargetHandler {
* @param fileBasedRecovery whether or not this call is part of an file based recovery
* @param totalTranslogOps total translog operations expected to be sent
*/
void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException;
void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener);

/**
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe
}

@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkShardRequest;
Expand Down Expand Up @@ -198,13 +199,14 @@ public IndexResult index(Index op) throws IOException {
Future<Void> fut = shards.asyncRecoverReplica(replica,
(shard, node) -> new RecoveryTarget(shard, node, recoveryListener, v -> {}){
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps,
ActionListener<Void> listener) {
try {
indexedOnPrimary.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps);
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
}
});
fut.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2573,8 +2573,8 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException {
}) {
// we're only checking that listeners are called when the engine is open, before there is no point
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps);
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
assertListenerCalled.accept(replica);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,9 +491,9 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer>
}

@Override
TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException {
void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<TimeValue> listener) {
prepareTargetForTranslogCalled.set(true);
return super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps);
super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps, listener);
}

@Override
Expand Down Expand Up @@ -700,7 +700,7 @@ private List<StoreFileMetaData> generateFiles(Store store, int numFiles, IntSupp

class TestRecoveryTargetHandler implements RecoveryTargetHandler {
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) {
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
}

@Override
Expand Down