From 72c779456452fa8f683e5bdcadf2e5dd07a801b7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 17 Jan 2019 10:15:59 -0500 Subject: [PATCH 1/2] Make prepare engine step of recovery source non-blocking Relates #37174 --- .../recovery/PeerRecoveryTargetService.java | 12 +- .../recovery/RecoverySourceHandler.java | 106 +++++++++--------- .../indices/recovery/RecoveryTarget.java | 9 +- .../recovery/RecoveryTargetHandler.java | 2 +- .../recovery/RemoteRecoveryTargetHandler.java | 9 +- .../IndexLevelReplicationTests.java | 6 +- .../index/shard/IndexShardTests.java | 4 +- .../recovery/RecoverySourceHandlerTests.java | 6 +- 8 files changed, 82 insertions(+), 72 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 3a48702fd552b..bc2196501fb8e 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -431,13 +431,13 @@ public interface RecoveryListener { class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler { @Override - public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, - Task task) throws Exception { - try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() - )) { - recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps()); + public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) { + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { + final ActionListener listener = + new HandledTransportAction.ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request); + recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps(), + ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure)); } - channel.sendResponse(TransportResponse.Empty.INSTANCE); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 34434f50b456f..c096d152e5bfa 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -197,51 +197,48 @@ public void recoverToTarget(ActionListener listener) { assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" + startingSeqNo + "]"; - final TimeValue prepareEngineTime; - try { - // For a sequence based recovery, the target can keep its local translog - prepareEngineTime = prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, - shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); - } catch (final Exception e) { - throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); - } + final StepListener prepareEngineStep = new StepListener<>(); + // For a sequence based recovery, the target can keep its local translog + prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, + shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep); + final StepListener sendSnapshotStep = new StepListener<>(); + prepareEngineStep.whenComplete(prepareEngineTime -> { + /* + * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. + * This means that any document indexed into the primary after this will be replicated to this replica as well + * make sure to do this before sampling the max sequence number in the next step, to ensure that we send + * all documents up to maxSeqNo in phase2. + */ + runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()), + shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger); - /* - * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. - * This means that any document indexed into the primary after this will be replicated to this replica as well - * make sure to do this before sampling the max sequence number in the next step, to ensure that we send - * all documents up to maxSeqNo in phase2. - */ - runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()), - shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger); - - final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - /* - * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all - * operations in the required range will be available for replaying from the translog of the source. - */ - cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); - - if (logger.isTraceEnabled()) { - logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); - logger.trace("snapshot translog for recovery; current size is [{}]", - shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); - } + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); + /* + * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all + * operations in the required range will be available for replaying from the translog of the source. + */ + cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); + if (logger.isTraceEnabled()) { + logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); + logger.trace("snapshot translog for recovery; current size is [{}]", + shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); + } + final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); + resources.add(phase2Snapshot); + // we can release the retention lock here because the snapshot itself will retain the required operations. + IOUtils.close(retentionLock); + // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values + // are at least as high as the corresponding values on the primary when any of these operations were executed on it. + final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); + final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); + phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep); + sendSnapshotStep.whenComplete( + r -> IOUtils.close(phase2Snapshot), + e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e))); + + }, onFailure); - final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); - resources.add(phase2Snapshot); - // we can release the retention lock here because the snapshot itself will retain the required operations. - IOUtils.close(retentionLock); - // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values - // are at least as high as the corresponding values on the primary when any of these operations were executed on it. - final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); - final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); - final StepListener sendSnapshotStep = new StepListener<>(); - phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, - maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep); - sendSnapshotStep.whenComplete( - r -> IOUtils.close(phase2Snapshot), - e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e))); final StepListener finalizeStep = new StepListener<>(); sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure); @@ -251,7 +248,7 @@ public void recoverToTarget(ActionListener listener) { final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes, sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize, sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, - prepareEngineTime.millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis()); + prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis()); try { wrappedListener.onResponse(response); } finally { @@ -484,16 +481,23 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier } } - TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException { + void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { StopWatch stopWatch = new StopWatch().start(); - logger.trace("recovery [phase1]: prepare remote engine for translog"); + final ActionListener wrappedListener = ActionListener.wrap( + nullVal -> { + stopWatch.stop(); + final TimeValue tookTime = stopWatch.totalTime(); + logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime); + listener.onResponse(tookTime); + }, + e -> { + listener.onFailure(new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e)); + }); // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. - cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps)); - stopWatch.stop(); - final TimeValue tookTime = stopWatch.totalTime(); - logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime); - return tookTime; + logger.trace("recovery [phase1]: prepare remote engine for translog"); + cancellableThreads.execute(() -> + recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, wrappedListener)); } /** diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 5950f71006a85..b300490542968 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -366,9 +366,12 @@ private void ensureRefCount() { /*** Implementation of {@link RecoveryTargetHandler } */ @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { - state().getTranslog().totalOperations(totalTranslogOps); - indexShard().openEngineAndSkipTranslogRecovery(); + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + state().getTranslog().totalOperations(totalTranslogOps); + indexShard().openEngineAndSkipTranslogRecovery(); + return null; + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 3372c933559c6..0fa5e94c63cd2 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -35,7 +35,7 @@ public interface RecoveryTargetHandler { * @param fileBasedRecovery whether or not this call is part of an file based recovery * @param totalTranslogOps total translog operations expected to be sent */ - void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException; + void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener); /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index ba703aeee2850..0799cc6595189 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -77,11 +77,12 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe } @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, - new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery), - TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), + new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure), + in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 5fca37a71886d..f67ce8de1422a 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -25,6 +25,7 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkShardRequest; @@ -198,13 +199,14 @@ public IndexResult index(Index op) throws IOException { Future fut = shards.asyncRecoverReplica(replica, (shard, node) -> new RecoveryTarget(shard, node, recoveryListener, v -> {}){ @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, + ActionListener listener) { try { indexedOnPrimary.await(); } catch (InterruptedException e) { throw new AssertionError(e); } - super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps); + super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener); } }); fut.get(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 0d58239fd4a56..1ea90ec6e8fbc 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2573,8 +2573,8 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException { }) { // we're only checking that listeners are called when the engine is open, before there is no point @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { - super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps); + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { + super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener); assertListenerCalled.accept(replica); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 0cecc925b2488..cd495b7e17bee 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -491,9 +491,9 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier } @Override - TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException { + void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { prepareTargetForTranslogCalled.set(true); - return super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps); + super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps, listener); } @Override @@ -700,7 +700,7 @@ private List generateFiles(Store store, int numFiles, IntSupp class TestRecoveryTargetHandler implements RecoveryTargetHandler { @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) { + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { } @Override From 4d53c735d39f6ab8aa6643b7ec819f425897dfb1 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 21 Jan 2019 17:20:18 -0500 Subject: [PATCH 2/2] feedback --- .../indices/recovery/RecoverySourceHandler.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index c096d152e5bfa..f360a68b7a83c 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -226,7 +226,7 @@ public void recoverToTarget(ActionListener listener) { final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); resources.add(phase2Snapshot); // we can release the retention lock here because the snapshot itself will retain the required operations. - IOUtils.close(retentionLock); + retentionLock.close(); // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values // are at least as high as the corresponding values on the primary when any of these operations were executed on it. final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); @@ -235,7 +235,10 @@ public void recoverToTarget(ActionListener listener) { maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep); sendSnapshotStep.whenComplete( r -> IOUtils.close(phase2Snapshot), - e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e))); + e -> { + IOUtils.closeWhileHandlingException(phase2Snapshot); + onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e)); + }); }, onFailure); @@ -490,9 +493,7 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime); listener.onResponse(tookTime); }, - e -> { - listener.onFailure(new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e)); - }); + e -> listener.onFailure(new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e))); // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. logger.trace("recovery [phase1]: prepare remote engine for translog");