Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset replica engine before primary-replica resync #32867

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,14 @@ protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest re
public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
for (Translog.Operation operation : request.getOperations()) {
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);
final Engine.Result operationResult = replica.applyResyncOperation(operation);
if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(),
"Mappings are not available on the replica yet, triggered update: " + operationResult.getRequiredMappingUpdate());
}
location = syncOperationResultOrThrow(operationResult, location);
}
if (request.getTrimAboveSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
replica.trimOperationOfPreviousPrimaryTerms(request.getTrimAboveSeqNo());
}
replica.afterApplyResyncOperationsBulk(request.getTrimAboveSeqNo());
return location;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1623,16 +1623,21 @@ public interface Warmer {
public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;

/**
* Performs recovery from the transaction log.
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} .
* This operation will close the engine if the recovery fails.
*/
public abstract Engine recoverFromTranslog() throws IOException;
public abstract Engine recoverFromTranslog(long recoverUpToSeqNo) throws IOException;

/**
* Do not replay translog operations, but make the engine be ready.
*/
public abstract void skipTranslogRecovery();

/**
* Returns a {@link ReadOnlyEngine} that points to the last commit of the current engine.
*/
public abstract Engine lockDownEngine() throws IOException;

/**
* Returns <code>true</code> iff this engine is currently recovering from translog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,15 +364,15 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
}

@Override
public InternalEngine recoverFromTranslog() throws IOException {
public InternalEngine recoverFromTranslog(long recoverUpToSeqNo) throws IOException {
flushLock.lock();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslogInternal();
recoverFromTranslogInternal(recoverUpToSeqNo);
} catch (Exception e) {
try {
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
Expand All @@ -388,17 +388,23 @@ public InternalEngine recoverFromTranslog() throws IOException {
return this;
}

// for testing
final Engine recoverFromTranslog() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary? can we just use the recoverFromTranslog(Long.MAX_VALUE) instead in the tests?

return recoverFromTranslog(Long.MAX_VALUE);
}

@Override
public void skipTranslogRecovery() {
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
}

private void recoverFromTranslogInternal() throws IOException {
private void recoverFromTranslogInternal(long recoverUpToSeqNo) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) {
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), translogGen), recoverUpToSeqNo)) {
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
Expand Down Expand Up @@ -2234,6 +2240,22 @@ public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return localCheckpointTracker.getStats(globalCheckpoint);
}

@Override
public Engine lockDownEngine() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we would do this entirely outside of the engine and maybe just pass and engine to the ctor of ReadOnlyEngine? do we need to make sure we don't receive writes after we did this or why do we acquire a write lock?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will move this to ctor of ReadOnlyEngine.

try (ReleasableLock ignored = writeLock.acquire()) {
refresh("lockdown", SearcherScope.INTERNAL);
Searcher searcher = acquireSearcher("lockdown", SearcherScope.INTERNAL);
try {
ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(this.engineConfig, lastCommittedSegmentInfos, searcher,
getSeqNoStats(getLastSyncedGlobalCheckpoint()), getTranslogStats());
searcher = null;
return readOnlyEngine;
} finally {
IOUtils.close(searcher);
}
}
}

/**
* Returns the number of times a version was looked up either from the index.
* Note this is only available if assertions are enabled
Expand Down
Loading