-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reset replica engine before primary-replica resync #32867
Conversation
When a replica starts following a newly promoted primary, it may have some operations which don't exist on the new primary. We need to reset replicas to the global checkpoint before executing primary-replica resync. These two steps will align replicas to the primary. This change resets an engine of a replica to the safe commit when detecting a new primary term, then reindex operations from the local translog up to the global checkpoint.
Pinging @elastic/es-distributed |
/cc @not-napoleon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did an initial pass and left some comments
@@ -388,17 +388,23 @@ public InternalEngine recoverFromTranslog() throws IOException { | |||
return this; | |||
} | |||
|
|||
// for testing | |||
final Engine recoverFromTranslog() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this necessary? can we just use the recoverFromTranslog(Long.MAX_VALUE)
instead in the tests?
@@ -196,6 +196,11 @@ | |||
protected volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm | |||
protected volatile long operationPrimaryTerm; | |||
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>(); | |||
|
|||
private final AtomicReference<Engine> resettingEngineReference = new AtomicReference<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me having 2 AtomicReference in flight is very very confusing. I think we can simplify this by introducing an EngineReference
class that we make final here and add some the reset logic internally. Or, alterantively keep the AtomicReference<EngineReference>
.
it could look like this:
class EngineReference {
private volatile Engine activeEngine;
private volatile Engine pendingEngine;
synchronized boolean hasPendingEngine() {
return pendingEngine != null;
}
synchronized void makeActiveReadOnly() {
// do the lockdown thing...
}
synchronized void swapPendingEngine() {
// do the swap... and close the current etc.
}
}
this looks more contained and we can maybe test it in isolation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
return applyIndexOperation(seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, isRetry, | ||
Engine.Operation.Origin.REPLICA, sourceToParse); | ||
boolean isRetry, SourceToParse sourceToParse) throws IOException { | ||
return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering why you did that and I think I understand what you are trying to do. You try to make sure we always get the latest engine ie. the locked down one if we swap it. but there is still a race imo. inside applyIndexOperation you might have an engine that is already closed unless you put a lock around it. The swap might be atomic but the reference might still receive writes after you locked it down, is this ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before swapping engines, we drain all IndexShardOperationPermits (backed by Semaphore) and a write operation requires an IndexShardPermit. I think we are okay here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ maybe we add this as a comment somewhere or even as an assertion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to use getEngineForResync
here? Assume that there are documents already replicated by the new primary before this replica has received all resync operations. Also wondering why there have been no test failures, maybe test coverage is not good? Also note that getEngineForResync
is probably not the best name for this. I think there's a bigger issue here, let's sync about this tomorrow.
@@ -2234,6 +2240,22 @@ public SeqNoStats getSeqNoStats(long globalCheckpoint) { | |||
return localCheckpointTracker.getStats(globalCheckpoint); | |||
} | |||
|
|||
@Override | |||
public Engine lockDownEngine() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally we would do this entirely outside of the engine and maybe just pass and engine to the ctor of ReadOnlyEngine? do we need to make sure we don't receive writes after we did this or why do we acquire a write lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will move this to ctor of ReadOnlyEngine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just given this an initial look.
|
||
final boolean canResetEngine() { | ||
// TODO: do not reset the following shard | ||
return indexSettings.getIndexVersionCreated().onOrAfter(Version.V_6_4_0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICS (correct me if I'm wrong) you had to it this way because we don't know on what node version the primary is (i.e. if it is going to send maxSeqNo or not), and the shard is reset when we acquire the replica operation permit (i.e. possibly before we receive the first resync request). It's a shame because it means we can't ensure consistency for older indices. The only other solution I can think of right now would be to always send the maximum sequence number with the replication request (same as we do for the global checkpoint). We could then pass this to acquireReplicaOperationPermit (same as the global checkpoint).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ywelsch Yeah, you understood it correctly. I had the same thought but did not go with that option as I wasn't sure if it's a right trade-off. I am glad that you suggest it. Should we make that change into this PR or a separate prerequisite PR to reduce noise in this PR?
} | ||
|
||
private void resetEngineUpToLocalCheckpoint(long recoverUpToSeqNo) throws IOException { | ||
synchronized (mutex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I see this correctly, you're doing recoverFromTranslog
under the mutex here? This can potentially block the cluster state update thread for minutes.
// the resetting engine will be activated only if its local_checkpoint at least this guard. | ||
minRequiredCheckpointForResettingEngine.set(currentMaxSeqNo); | ||
resettingEngineReference.set(resettingEngine); | ||
changeState(IndexShardState.RECOVERING, "reset engine from=" + currentMaxSeqNo + " to=" + globalCheckpoint); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why move the state back to RECOVERING?
} | ||
|
||
private void completeResettingEngineWithLocalHistory() throws IOException { | ||
synchronized (mutex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above. You can't do stuff that possibly blocks the mutex for minutes
|
||
@Override | ||
public void refresh(String source) throws EngineException { | ||
// noop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refreshes should not be happening? If so, should we throw an UnsupportedOperationException here?
|
||
@Override | ||
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { | ||
throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to assert that all of these methods are never called?
/** | ||
* An engine that does not accept writes, and always points stats, searcher to the last commit. | ||
*/ | ||
final class ReadOnlyEngine extends Engine { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should give this a different name, in particular because we might have something similar for frozen indices. There it might be a more complete version of readonly
, with possibility to take a Translog.Snapshot
. Maybe we could call this SearchOnlyEngine
.
return applyIndexOperation(seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, isRetry, | ||
Engine.Operation.Origin.REPLICA, sourceToParse); | ||
boolean isRetry, SourceToParse sourceToParse) throws IOException { | ||
return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments. looks good!
|
||
@Override | ||
protected void closeNoLock(String reason, CountDownLatch closedLatch) { | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should protect this against double counting down the closeLatch
by wrapping this entire try block in
if (isClosed.compareAndSet(false, true)) {
}
this.seqNoStats = engine.getSeqNoStats(engine.getLastSyncedGlobalCheckpoint()); | ||
this.translogStats = engine.getTranslogStats(); | ||
this.lastCommittedSegmentInfos = engine.getLastCommittedSegmentInfos(); | ||
Searcher searcher = engine.acquireSearcher("lockdown", SearcherScope.INTERNAL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you leave a comment here that we keep a reference to the store implicitly through the searcher? I do wonder if we should make it explicit
this.lastCommittedSegmentInfos = engine.getLastCommittedSegmentInfos(); | ||
Searcher searcher = engine.acquireSearcher("lockdown", SearcherScope.INTERNAL); | ||
try { | ||
this.searcherManager = new SearcherManager(searcher.getDirectoryReader(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this searcher manager seems to be unclosed. I think you should close it as well in the closeNoLock method?
store.incRef(); | ||
Releasable releasable = store::decRef; | ||
try (ReleasableLock ignored = readLock.acquire()) { | ||
final EngineSearcher searcher = new EngineSearcher(source, searcherManager, store, logger); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you try to exercise this method to make sure we open a new searcher and close / release everything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, getDocIds
method in SearchOnlyEngineTests#testSearchOnlyEngine
acquires searchers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
private final SearcherManager searcherManager; | ||
private final Searcher lastCommitSearcher; | ||
|
||
public SearchOnlyEngine(Engine engine) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do wonder if it would make more sense to open this entire thing off a store directly and maybe just pass and EngineConfig to this. it would make it more generic and less bound to an engine. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something like this:
public SearchOnlyEngine(EngineConfig config) {
super(config);
try {
Store store = config.getStore();
store.incRef();
DirectoryReader reader = null;
boolean success = false;
try {
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(store.directory());
this.translogStats = new TranslogStats(0, 0, 0, 0, 0);
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommittedSegmentInfos.userData.entrySet());
long maxSeqNo = seqNoStats.maxSeqNo;
long localCheckpoint = seqNoStats.localCheckpoint;
this.seqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, localCheckpoint);
reader = SeqIdGeneratingDirectoryReader.wrap(ElasticsearchDirectoryReader.wrap(DirectoryReader
.open(store.directory()), config.getShardId()), config.getPrimaryTermSupplier().getAsLong());
this.indexCommit = reader.getIndexCommit();
this.searcherManager = new SearcherManager(reader, new SearcherFactory());
success = true;
} finally {
if (success == false) {
IOUtils.close(reader, store::decRef);
}
}
} catch (IOException e) {
throw new UncheckedIOException(e); // this is stupid
}
}
I did something similar a while back so I had it ready... I am not sure it safe to use 💯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I adopted this, but I have to pass SeqNoStats from outside because we use a "reset" local checkpoint which may not equal to the value from an index commit.
|
||
@Override | ||
public void maybePruneDeletes() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit extra newline
@@ -1266,14 +1269,16 @@ public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { | |||
|
|||
// package-private for testing | |||
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException { | |||
recoveryState.getTranslog().totalOperations(snapshot.totalOperations()); | |||
recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations()); | |||
if (isEngineResetting() == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason we can't just run this the same way we do if we are not resetting?
private Engine createNewEngine(EngineConfig config) throws IOException { | ||
assert Thread.holdsLock(mutex); | ||
if (state == IndexShardState.CLOSED) { | ||
throw new AlreadyClosedException(shardId + " can't create engine - shard is closed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure, should we through IndexShardNotClosedException instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should throw IndexShardClosedException. AlreadyClosedException was a left-over when we folded Engine to IndexShard.
} | ||
|
||
@Override | ||
public void close() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this synchronized too. it's safer since you modify both references
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new); | ||
return newMultiSnapshot(snapshots); | ||
Snapshot snapshot = newMultiSnapshot(snapshots); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe you just return snapshot if upToSeqNo is == Long.MAX_VALUE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
synchronized void closePendingEngine() throws IOException { | ||
final Engine closing = this.pendingEngine; | ||
this.pendingEngine = null; | ||
IOUtils.close(closing); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you restrict the mutex to the first two lines and call close outside the mutex?
} | ||
|
||
@Override | ||
public void close() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be synchronized so we get a consistent snapshot of the two engines?
Also, again, please do the closing outside the lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is too much to review in one sitting. Can you open a PR just for the recoverFromTranslog
change where we can now specify an upper bound?
getEngine().flush(true, true); // force=true to make sure that we roll a translog generation | ||
getEngine().resetLocalCheckpoint(localCheckpoint); | ||
} | ||
logger.info("detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this log message does not contain the right "before" local checkpoint as you moved it to after the local checkpoint reset
getEngine().resetLocalCheckpoint(localCheckpoint); | ||
getEngine().rollTranslogGeneration(); | ||
if (isEngineResetting()) { | ||
engineHolder.closePendingEngine(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this is insufficient in the presence of cascading primary failures. Assume that you have a primary failover, which wants to index sequence number range 2 to 5 (because global checkpoint on new primary was 2, and resync trim-off is 5). Now, while resyncing, the global checkpoint moves from 2 to 3, and the new primary fails. Another primary is selected, which, for our purposes, has the global checkpoint 3. In that case the doc with sequence number 3 will only be in the translog and the pending Lucene index. By throwing the pending Lucene index away here, we now have to reset the local checkpoint and replay from sequence number 2 (to seq number 3).
What the implementation does here though is to not reset the local checkpoint to number 2, but leave it at 3, which, if this new IndexCommit is flushed, will lead to the situation where the local checkpoint info in the index commit is wrong (i.e. it might not contain the operation number 3).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is okay because we start another engine after that in this case. Moreover, we stick with the "reset" local checkpoint (expose the local checkpoint of the active engine) while resetting the engine; thus the global checkpoint won't advance.
return null; | ||
} else { | ||
engineHolder.makeActiveEngineSearchOnly(); | ||
final Engine pendingEngine = createNewEngine(newEngineConfig()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this trims unsafe commits, possibly cleaning up segments that are referenced by the active search only engine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We open a directory reader in the constructor of a search-only engine and keeps that reader until we manually close the search-only engine. Holding that reader would prevent the segment files of the last commit from deleting during trimming unsafe commits.
|
||
private void completePendingEngineWithLocalHistory() throws IOException { | ||
final Engine pendingEngine; | ||
synchronized (mutex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need to do this under the mutex?
this.pendingEngine = null; | ||
} | ||
} | ||
IOUtils.close(closing); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we ensure that searches are not accessing acquireSearcher on the closed engine and switching to the new engine? Also, is there a test that checks that searches (with preference set to this node) continue to work during this transition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a small interval in that callers might acquire searchers from the closed engine and hit AlreadClosedException. We can void this entirely by retrying on "AlreadClosedException" if the accessing engine is different than the current active engine. However, I am not sure if we should do it.
There is a test which continuously acquires searchers and makes sure that all acknowledged writes are maintained during the transition. https://github.com/elastic/elasticsearch/pull/32867/files#diff-b268f5fefa5837ece96b957e46f628cbR674 (getShardDocUIDs acquires a searcher and uses that searcher to collect document Ids).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, I am not sure if we should do it.
why is that? We're building all this machinery to have search availability during the transition, except for this very short moment?
I had the same idea about retrying. An alternative would be to do refcounting for closing the engine, to ensure that we only actually close once all in-flight acquireSearcher
calls have been completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I did not think this carefully. I thought we have to implement the retry on all methods that we support in the search-only, but I was wrong. We only need to implement the retry for "get" and "acquire searcher". These two methods should be simple. Thanks for this great question.
This change allows an engine to recover from its local translog up to the given seqno. The extended API can be used in these use cases: 1. When a replica starts following a new primary, it resets its index to the safe commit, then replays its local translog up to the current global checkpoint (see elastic#32867). 2. When a replica starts a peer-recovery, it can initialize the start_sequence_number to the persisted global checkpoint instead of the local checkpoint of the safe commit. A replica will then replay its local translog up to that global checkpoint before accepting remote translog from the primary. This change will increase the chance of operation-based recovery. I will make this in a follow-up. Relates elastic#32867
This commit allows us to use different TranslogRecoveryRunner when recovering an engine from its local translog. This change is a prerequisite for the commit-based rollback PR (elastic#32867). See elastic#32867 (comment)
This commit allows us to use different TranslogRecoveryRunner when recovering an engine from its local translog. This change is a prerequisite for the commit-based rollback PR. Relates #32867
Discussed this with @bleskes on another channel. We are going to split this PR into 3 smaller PRs so we can review.
|
This commit allows us to use different TranslogRecoveryRunner when recovering an engine from its local translog. This change is a prerequisite for the commit-based rollback PR. Relates #32867
This change adds an engine implementation that opens a reader on an existing index but doesn't permit any refreshes or modifications to the index. Relates to elastic#32867 Relates to elastic#32844
When a replica starts following a newly promoted primary, it may have some operations which don't exist on the new primary. Thus we need to throw those operations to align a replica with the new primary. This can be done by first resetting an engine from the safe commit, then replaying the local translog up to the global checkpoint. Relates #32867
If a shard was serving as a replica when another shard was promoted to primary, then its Lucene index was reset to the global checkpoint. However, if the new primary fails before the primary/replica resync completes and we are now being promoted, we have to restore the reverted operations by replaying the translog to avoid losing acknowledged writes. Relates elastic#32867
When a replica starts following a newly promoted primary, it may have some operations which don't exist on the new primary. Thus we need to throw those operations to align a replica with the new primary. This can be done by first resetting an engine from the safe commit, then replaying the local translog up to the global checkpoint. Relates #32867
If a shard was serving as a replica when another shard was promoted to primary, then its Lucene index was reset to the global checkpoint. However, if the new primary fails before the primary/replica resync completes and we are now being promoted, we have to restore the reverted operations by replaying the translog to avoid losing acknowledged writes. Relates #33473 Relates #32867
If a shard was serving as a replica when another shard was promoted to primary, then its Lucene index was reset to the global checkpoint. However, if the new primary fails before the primary/replica resync completes and we are now being promoted, we have to restore the reverted operations by replaying the translog to avoid losing acknowledged writes. Relates #33473 Relates #32867
If a shard was serving as a replica when another shard was promoted to primary, then its Lucene index was reset to the global checkpoint. However, if the new primary fails before the primary/replica resync completes and we are now being promoted, we have to restore the reverted operations by replaying the translog to avoid losing acknowledged writes. Relates #33473 Relates #32867
Today we expose a new engine immediately during Lucene rollback. The new engine is started with a safe commit which might not include all acknowledged operation. With this change, we won't expose the new engine until it has recovered from the local translog. Note that this solution is not complete since it's able to reserve only acknowledged operations before the global checkpoint. This is because we replay translog up to the global checkpoint during rollback. A per-doc Lucene rollback would solve this issue entirely. Relates #32867
All subtasks of this PR were done and merged. I am closing this. |
Today we expose a new engine immediately during Lucene rollback. The new engine is started with a safe commit which might not include all acknowledged operation. With this change, we won't expose the new engine until it has recovered from the local translog. Note that this solution is not complete since it's able to reserve only acknowledged operations before the global checkpoint. This is because we replay translog up to the global checkpoint during rollback. A per-doc Lucene rollback would solve this issue entirely. Relates #32867
When a replica starts following a newly promoted primary, it may have
some operations which don't exist on the new primary. We need to reset
replicas to the global checkpoint before executing primary-replica
resync. These two steps will align replicas to the primary.
This change resets an engine of a replica to the safe commit when
detecting a new primary term, then reindex operations from the local
translog up to the global checkpoint.