From 1ca6666ccab1217a07edbbc3aa89635caf3bf3b8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 7 Jan 2019 12:54:30 -0500 Subject: [PATCH 1/2] Do not mutable RecoveryResponse --- .../recovery/PeerRecoveryTargetService.java | 4 +- .../indices/recovery/RecoveryResponse.java | 99 +++----- .../recovery/RecoverySourceHandler.java | 239 +++++++++--------- .../recovery/RecoverySourceHandlerTests.java | 18 +- 4 files changed, 174 insertions(+), 186 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 8598917b82313..8e52a05e2ac30 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -304,9 +304,7 @@ public String executor() { @Override public RecoveryResponse read(StreamInput in) throws IOException { - RecoveryResponse recoveryResponse = new RecoveryResponse(); - recoveryResponse.readFrom(in); - return recoveryResponse; + return new RecoveryResponse(in); } }) ); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryResponse.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryResponse.java index 9018f6f0be199..dfc14e3f58271 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryResponse.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryResponse.java @@ -19,62 +19,57 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.transport.TransportResponse; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -class RecoveryResponse extends TransportResponse { +final class RecoveryResponse extends TransportResponse { - List phase1FileNames = new ArrayList<>(); - List phase1FileSizes = new ArrayList<>(); - List phase1ExistingFileNames = new ArrayList<>(); - List phase1ExistingFileSizes = new ArrayList<>(); - long phase1TotalSize; - long phase1ExistingTotalSize; - long phase1Time; - long phase1ThrottlingWaitTime; + final List phase1FileNames; + final List phase1FileSizes; + final List phase1ExistingFileNames; + final List phase1ExistingFileSizes; + final long phase1TotalSize; + final long phase1ExistingTotalSize; + final long phase1Time; + final long phase1ThrottlingWaitTime = 0L; // not used - long startTime; + final long startTime; - int phase2Operations; - long phase2Time; + final int phase2Operations; + final long phase2Time; - RecoveryResponse() { + RecoveryResponse(List phase1FileNames, List phase1FileSizes, List phase1ExistingFileNames, + List phase1ExistingFileSizes, long phase1TotalSize, long phase1ExistingTotalSize, + long phase1Time, long startTime, int phase2Operations, long phase2Time) { + this.phase1FileNames = phase1FileNames; + this.phase1FileSizes = phase1FileSizes; + this.phase1ExistingFileNames = phase1ExistingFileNames; + this.phase1ExistingFileSizes = phase1ExistingFileSizes; + this.phase1TotalSize = phase1TotalSize; + this.phase1ExistingTotalSize = phase1ExistingTotalSize; + this.phase1Time = phase1Time; + this.startTime = startTime; + this.phase2Operations = phase2Operations; + this.phase2Time = phase2Time; } - @Override - public void readFrom(StreamInput in) throws IOException { + RecoveryResponse(StreamInput in) throws IOException { super.readFrom(in); - int size = in.readVInt(); - phase1FileNames = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - phase1FileNames.add(in.readString()); - } - size = in.readVInt(); - phase1FileSizes = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - phase1FileSizes.add(in.readVLong()); - } - - size = in.readVInt(); - phase1ExistingFileNames = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - phase1ExistingFileNames.add(in.readString()); - } - size = in.readVInt(); - phase1ExistingFileSizes = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - phase1ExistingFileSizes.add(in.readVLong()); - } - + phase1FileNames = in.readList(StreamInput::readString); + phase1FileSizes = in.readList(StreamInput::readVLong); + phase1ExistingFileNames = in.readList(StreamInput::readString); + phase1ExistingFileSizes = in.readList(StreamInput::readVLong); phase1TotalSize = in.readVLong(); phase1ExistingTotalSize = in.readVLong(); phase1Time = in.readVLong(); - phase1ThrottlingWaitTime = in.readVLong(); + if (in.getVersion().before(Version.V_7_0_0)) { + in.readVLong(); // phase1ThrottlingWaitTime - not used + } startTime = in.readVLong(); phase2Operations = in.readVInt(); phase2Time = in.readVLong(); @@ -83,28 +78,16 @@ public void readFrom(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeVInt(phase1FileNames.size()); - for (String name : phase1FileNames) { - out.writeString(name); - } - out.writeVInt(phase1FileSizes.size()); - for (long size : phase1FileSizes) { - out.writeVLong(size); - } - - out.writeVInt(phase1ExistingFileNames.size()); - for (String name : phase1ExistingFileNames) { - out.writeString(name); - } - out.writeVInt(phase1ExistingFileSizes.size()); - for (long size : phase1ExistingFileSizes) { - out.writeVLong(size); - } - + out.writeStringList(phase1FileNames); + out.writeCollection(phase1FileSizes, StreamOutput::writeVLong); + out.writeStringList(phase1ExistingFileNames); + out.writeCollection(phase1ExistingFileSizes, StreamOutput::writeVLong); out.writeVLong(phase1TotalSize); out.writeVLong(phase1ExistingTotalSize); out.writeVLong(phase1Time); - out.writeVLong(phase1ThrottlingWaitTime); + if (out.getVersion().before(Version.V_7_0_0)) { + out.writeVLong(0L); // phase1ThrottlingWaitTime - not used + } out.writeVLong(startTime); out.writeVInt(phase2Operations); out.writeVLong(phase2Time); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index f6196e36ba2d3..b1fbd1ce68532 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.core.internal.io.IOUtils; @@ -64,6 +65,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Locale; @@ -95,8 +97,6 @@ public class RecoverySourceHandler { private final int chunkSizeInBytes; private final RecoveryTargetHandler recoveryTarget; - protected final RecoveryResponse response; - private final CancellableThreads cancellableThreads = new CancellableThreads() { @Override protected void onCancel(String reason, @Nullable Exception suppressedException) { @@ -122,7 +122,6 @@ public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recov this.shardId = this.request.shardId().id(); this.logger = Loggers.getLogger(getClass(), request.shardId(), "recover to " + request.targetNode().getName()); this.chunkSizeInBytes = fileChunkSizeInBytes; - this.response = new RecoveryResponse(); } public StartRecoveryRequest getRequest() { @@ -149,10 +148,12 @@ public RecoveryResponse recoverToTarget() throws IOException { final long requiredSeqNoRangeStart; final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()); + final SendFileResult sendFileResult; if (isSequenceNumberBasedRecovery) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); requiredSeqNoRangeStart = startingSeqNo; + sendFileResult = SendFileResult.EMPTY; } else { final Engine.IndexCommitRef phase1Snapshot; try { @@ -169,7 +170,7 @@ public RecoveryResponse recoverToTarget() throws IOException { startingSeqNo = shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0; try { final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo); - phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps); + sendFileResult = phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); } finally { @@ -184,9 +185,10 @@ public RecoveryResponse recoverToTarget() throws IOException { assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" + startingSeqNo + "]"; + final TimeValue prepareEngineTime; try { // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, + prepareEngineTime = prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); @@ -213,21 +215,24 @@ public RecoveryResponse recoverToTarget() throws IOException { logger.trace("snapshot translog for recovery; current size is [{}]", shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); } - final long targetLocalCheckpoint; + final SendSnapshotResult sendSnapshotResult; try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) { // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values // are at least as high as the corresponding values on the primary when any of these operations were executed on it. final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); - targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, + sendSnapshotResult = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); } - finalizeRecovery(targetLocalCheckpoint); + finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint); + return new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes, + sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize, + sendFileResult.existingTotalSize, sendFileResult.took.millis(), prepareEngineTime.millis(), + sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis()); } - return response; } private boolean isTargetSameHistory() { @@ -276,6 +281,32 @@ public void onFailure(Exception e) { }); } + static final class SendFileResult { + final List phase1FileNames; + final List phase1FileSizes; + final long totalSize; + + final List phase1ExistingFileNames; + final List phase1ExistingFileSizes; + final long existingTotalSize; + + final TimeValue took; + + SendFileResult(List phase1FileNames, List phase1FileSizes, long totalSize, + List phase1ExistingFileNames, List phase1ExistingFileSizes, long existingTotalSize, TimeValue took) { + this.phase1FileNames = phase1FileNames; + this.phase1FileSizes = phase1FileSizes; + this.totalSize = totalSize; + this.phase1ExistingFileNames = phase1ExistingFileNames; + this.phase1ExistingFileSizes = phase1ExistingFileSizes; + this.existingTotalSize = existingTotalSize; + this.took = took; + } + + static final SendFileResult EMPTY = new SendFileResult(Collections.emptyList(), Collections.emptyList(), 0L, + Collections.emptyList(), Collections.emptyList(), 0L, TimeValue.ZERO); + } + /** * Perform phase1 of the recovery operations. Once this {@link IndexCommit} * snapshot has been performed no commit operations (files being fsync'd) @@ -285,12 +316,16 @@ public void onFailure(Exception e) { * segments that are missing. Only segments that have the same size and * checksum can be reused */ - public void phase1(final IndexCommit snapshot, final Supplier translogOps) { + public SendFileResult phase1(final IndexCommit snapshot, final Supplier translogOps) { cancellableThreads.checkForCancel(); // Total size of segment files that are recovered long totalSize = 0; // Total size of segment files that were able to be re-used long existingTotalSize = 0; + final List phase1FileNames = new ArrayList<>(); + final List phase1FileSizes = new ArrayList<>(); + final List phase1ExistingFileNames = new ArrayList<>(); + final List phase1ExistingFileSizes = new ArrayList<>(); final Store store = shard.store(); store.incRef(); try { @@ -331,8 +366,8 @@ public void phase1(final IndexCommit snapshot, final Supplier translogO } else { final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot()); for (StoreFileMetaData md : diff.identical) { - response.phase1ExistingFileNames.add(md.name()); - response.phase1ExistingFileSizes.add(md.length()); + phase1ExistingFileNames.add(md.name()); + phase1ExistingFileSizes.add(md.length()); existingTotalSize += md.length(); if (logger.isTraceEnabled()) { logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," + @@ -350,20 +385,16 @@ public void phase1(final IndexCommit snapshot, final Supplier translogO } else { logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name()); } - response.phase1FileNames.add(md.name()); - response.phase1FileSizes.add(md.length()); + phase1FileNames.add(md.name()); + phase1FileSizes.add(md.length()); totalSize += md.length(); } - response.phase1TotalSize = totalSize; - response.phase1ExistingTotalSize = existingTotalSize; - logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", - response.phase1FileNames.size(), - new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); - cancellableThreads.execute(() -> - recoveryTarget.receiveFileInfo(response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, - response.phase1ExistingFileSizes, translogOps.get())); + phase1FileNames.size(), new ByteSizeValue(totalSize), + phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); + cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo( + phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.get())); // How many bytes we've copied since we last called RateLimiter.pause final Function outputStreamFactories = md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogOps), chunkSizeInBytes); @@ -417,27 +448,27 @@ public void phase1(final IndexCommit snapshot, final Supplier translogO } } } - - logger.trace("recovery [phase1]: took [{}]", stopWatch.totalTime()); - response.phase1Time = stopWatch.totalTime().millis(); + final TimeValue took = stopWatch.totalTime(); + logger.trace("recovery [phase1]: took [{}]", took); + return new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames, + phase1ExistingFileSizes, existingTotalSize, took); } catch (Exception e) { - throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e); + throw new RecoverFilesRecoveryException(request.shardId(), phase1FileNames.size(), new ByteSizeValue(totalSize), e); } finally { store.decRef(); } } - void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException { + TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException { StopWatch stopWatch = new StopWatch().start(); logger.trace("recovery [phase1]: prepare remote engine for translog"); - final long startEngineStart = stopWatch.totalTime().millis(); // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps)); stopWatch.stop(); - - response.startTime = stopWatch.totalTime().millis() - startEngineStart; - logger.trace("recovery [phase1]: remote engine start took [{}]", stopWatch.totalTime()); + final TimeValue tookTime = stopWatch.totalTime(); + logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime); + return tookTime; } /** @@ -454,102 +485,23 @@ void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTr * @param snapshot a snapshot of the translog * @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it. - * @return the local checkpoint on the target + * @return the send snapshot result */ - long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot, - final long maxSeenAutoIdTimestamp, final long maxSeqNoOfUpdatesOrDeletes) - throws IOException { + SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, + long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException { + assert requiredSeqNoRangeStart <= endingSeqNo + 1: + "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; + assert startingSeqNo <= requiredSeqNoRangeStart : + "startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart; if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } - cancellableThreads.checkForCancel(); final StopWatch stopWatch = new StopWatch().start(); logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " + "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); - // send all the snapshot's translog operations to the target - final SendSnapshotResult result = sendSnapshot( - startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes); - - stopWatch.stop(); - logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime()); - response.phase2Time = stopWatch.totalTime().millis(); - response.phase2Operations = result.totalOperations; - return result.targetLocalCheckpoint; - } - - /* - * finalizes the recovery process - */ - public void finalizeRecovery(final long targetLocalCheckpoint) throws IOException { - if (shard.state() == IndexShardState.CLOSED) { - throw new IndexShardClosedException(request.shardId()); - } - cancellableThreads.checkForCancel(); - StopWatch stopWatch = new StopWatch().start(); - logger.trace("finalizing recovery"); - /* - * Before marking the shard as in-sync we acquire an operation permit. We do this so that there is a barrier between marking a - * shard as in-sync and relocating a shard. If we acquire the permit then no relocation handoff can complete before we are done - * marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire - * the permit then the state of the shard will be relocated and this recovery will fail. - */ - runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint), - shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger); - final long globalCheckpoint = shard.getGlobalCheckpoint(); - cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint)); - runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), - shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger); - - if (request.isPrimaryRelocation()) { - logger.trace("performing relocation hand-off"); - // this acquires all IndexShard operation permits and will thus delay new recoveries until it is done - cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext)); - /* - * if the recovery process fails after disabling primary mode on the source shard, both relocation source and - * target are failed (see {@link IndexShard#updateRoutingEntry}). - */ - } - stopWatch.stop(); - logger.trace("finalizing recovery took [{}]", stopWatch.totalTime()); - } - - static class SendSnapshotResult { - - final long targetLocalCheckpoint; - final int totalOperations; - - SendSnapshotResult(final long targetLocalCheckpoint, final int totalOperations) { - this.targetLocalCheckpoint = targetLocalCheckpoint; - this.totalOperations = totalOperations; - } - - } - - /** - * Send the given snapshot's operations with a sequence number greater than the specified staring sequence number to this handler's - * target node. - *

- * Operations are bulked into a single request depending on an operation count limit or size-in-bytes limit. - * - * @param startingSeqNo the sequence number for which only operations with a sequence number greater than this will be sent - * @param requiredSeqNoRangeStart the lower sequence number of the required range - * @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive) - * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the - * total number of operations sent - * @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary - * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it. - * @throws IOException if an I/O exception occurred reading the translog snapshot - */ - protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, - final Translog.Snapshot snapshot, final long maxSeenAutoIdTimestamp, - final long maxSeqNoOfUpdatesOrDeletes) throws IOException { - assert requiredSeqNoRangeStart <= endingSeqNo + 1: - "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; - assert startingSeqNo <= requiredSeqNoRangeStart : - "startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart; int ops = 0; long size = 0; int skippedOps = 0; @@ -615,7 +567,58 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps); - return new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps); + stopWatch.stop(); + final TimeValue tookTime = stopWatch.totalTime(); + logger.trace("recovery [phase2]: took [{}]", tookTime); + return new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps, tookTime); + } + + /* + * finalizes the recovery process + */ + public void finalizeRecovery(final long targetLocalCheckpoint) throws IOException { + if (shard.state() == IndexShardState.CLOSED) { + throw new IndexShardClosedException(request.shardId()); + } + cancellableThreads.checkForCancel(); + StopWatch stopWatch = new StopWatch().start(); + logger.trace("finalizing recovery"); + /* + * Before marking the shard as in-sync we acquire an operation permit. We do this so that there is a barrier between marking a + * shard as in-sync and relocating a shard. If we acquire the permit then no relocation handoff can complete before we are done + * marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire + * the permit then the state of the shard will be relocated and this recovery will fail. + */ + runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint), + shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger); + final long globalCheckpoint = shard.getGlobalCheckpoint(); + cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint)); + runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), + shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger); + + if (request.isPrimaryRelocation()) { + logger.trace("performing relocation hand-off"); + // this acquires all IndexShard operation permits and will thus delay new recoveries until it is done + cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext)); + /* + * if the recovery process fails after disabling primary mode on the source shard, both relocation source and + * target are failed (see {@link IndexShard#updateRoutingEntry}). + */ + } + stopWatch.stop(); + logger.trace("finalizing recovery took [{}]", stopWatch.totalTime()); + } + + static final class SendSnapshotResult { + final long targetLocalCheckpoint; + final int totalOperations; + final TimeValue tookTime; + + SendSnapshotResult(final long targetLocalCheckpoint, final int totalOperations, final TimeValue tookTime) { + this.targetLocalCheckpoint = targetLocalCheckpoint; + this.totalOperations = totalOperations; + this.tookTime = tookTime; + } } /** diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index af4dc59ca1a76..3f6a8072d86d5 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -46,6 +46,7 @@ import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; @@ -190,7 +191,7 @@ public void testSendSnapshotSendsOps() throws IOException { final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1); final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1); final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1); - RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo, + RecoverySourceHandler.SendSnapshotResult result = handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, new Translog.Snapshot() { @Override public void close() { @@ -229,7 +230,7 @@ public Translog.Operation next() throws IOException { .filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList()); List opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps); expectThrows(IllegalStateException.class, () -> - handler.sendSnapshot(startingSeqNo, requiredStartingSeqNo, + handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, new Translog.Snapshot() { @Override public void close() { @@ -412,20 +413,23 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE recoverySettings.getChunkSize().bytesAsInt()) { @Override - public void phase1(final IndexCommit snapshot, final Supplier translogOps) { + public SendFileResult phase1(final IndexCommit snapshot, final Supplier translogOps) { phase1Called.set(true); + return super.phase1(snapshot, translogOps); } @Override - void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException { + TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException { prepareTargetForTranslogCalled.set(true); + return super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps); } @Override - long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, - long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) { + SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, + long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException { phase2Called.set(true); - return SequenceNumbers.UNASSIGNED_SEQ_NO; + return super.phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, + maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes); } }; From be20f2f28fd702d024fc13ed92ac7f712a99caa4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 8 Jan 2019 10:44:00 -0500 Subject: [PATCH 2/2] feedback --- .../indices/recovery/RecoveryResponse.java | 16 ++++++---------- .../indices/recovery/RecoverySourceHandler.java | 3 ++- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryResponse.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryResponse.java index dfc14e3f58271..02d4ff5dbc13b 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryResponse.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryResponse.java @@ -19,7 +19,6 @@ package org.elasticsearch.indices.recovery; -import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.transport.TransportResponse; @@ -36,7 +35,7 @@ final class RecoveryResponse extends TransportResponse { final long phase1TotalSize; final long phase1ExistingTotalSize; final long phase1Time; - final long phase1ThrottlingWaitTime = 0L; // not used + final long phase1ThrottlingWaitTime; final long startTime; @@ -45,7 +44,7 @@ final class RecoveryResponse extends TransportResponse { RecoveryResponse(List phase1FileNames, List phase1FileSizes, List phase1ExistingFileNames, List phase1ExistingFileSizes, long phase1TotalSize, long phase1ExistingTotalSize, - long phase1Time, long startTime, int phase2Operations, long phase2Time) { + long phase1Time, long phase1ThrottlingWaitTime, long startTime, int phase2Operations, long phase2Time) { this.phase1FileNames = phase1FileNames; this.phase1FileSizes = phase1FileSizes; this.phase1ExistingFileNames = phase1ExistingFileNames; @@ -53,13 +52,14 @@ final class RecoveryResponse extends TransportResponse { this.phase1TotalSize = phase1TotalSize; this.phase1ExistingTotalSize = phase1ExistingTotalSize; this.phase1Time = phase1Time; + this.phase1ThrottlingWaitTime = phase1ThrottlingWaitTime; this.startTime = startTime; this.phase2Operations = phase2Operations; this.phase2Time = phase2Time; } RecoveryResponse(StreamInput in) throws IOException { - super.readFrom(in); + super(in); phase1FileNames = in.readList(StreamInput::readString); phase1FileSizes = in.readList(StreamInput::readVLong); phase1ExistingFileNames = in.readList(StreamInput::readString); @@ -67,9 +67,7 @@ final class RecoveryResponse extends TransportResponse { phase1TotalSize = in.readVLong(); phase1ExistingTotalSize = in.readVLong(); phase1Time = in.readVLong(); - if (in.getVersion().before(Version.V_7_0_0)) { - in.readVLong(); // phase1ThrottlingWaitTime - not used - } + phase1ThrottlingWaitTime = in.readVLong(); startTime = in.readVLong(); phase2Operations = in.readVInt(); phase2Time = in.readVLong(); @@ -85,9 +83,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(phase1TotalSize); out.writeVLong(phase1ExistingTotalSize); out.writeVLong(phase1Time); - if (out.getVersion().before(Version.V_7_0_0)) { - out.writeVLong(0L); // phase1ThrottlingWaitTime - not used - } + out.writeVLong(phase1ThrottlingWaitTime); out.writeVLong(startTime); out.writeVInt(phase2Operations); out.writeVLong(phase2Time); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index b1fbd1ce68532..315af6b4ae084 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -228,9 +228,10 @@ public RecoveryResponse recoverToTarget() throws IOException { } finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint); + final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time return new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes, sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize, - sendFileResult.existingTotalSize, sendFileResult.took.millis(), prepareEngineTime.millis(), + sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, prepareEngineTime.millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis()); } }