-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reset replica engine before primary-replica resync #32867
Changes from 1 commit
1105179
2e10680
216b890
0ebb7e3
95f3e9f
ce10e16
92021f7
b60d0e8
ca7e57f
f0a1496
8f47769
a224c6b
7e08bd3
eacd9b2
aa01fbd
b3ac7e1
ef98712
021f94b
607535d
06da55a
b0ac93a
72ef838
379509e
0f19a28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -364,15 +364,15 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) { | |
} | ||
|
||
@Override | ||
public InternalEngine recoverFromTranslog() throws IOException { | ||
public InternalEngine recoverFromTranslog(long recoverUpToSeqNo) throws IOException { | ||
flushLock.lock(); | ||
try (ReleasableLock lock = readLock.acquire()) { | ||
ensureOpen(); | ||
if (pendingTranslogRecovery.get() == false) { | ||
throw new IllegalStateException("Engine has already been recovered"); | ||
} | ||
try { | ||
recoverFromTranslogInternal(); | ||
recoverFromTranslogInternal(recoverUpToSeqNo); | ||
} catch (Exception e) { | ||
try { | ||
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush | ||
|
@@ -388,17 +388,23 @@ public InternalEngine recoverFromTranslog() throws IOException { | |
return this; | ||
} | ||
|
||
// for testing | ||
final Engine recoverFromTranslog() throws IOException { | ||
return recoverFromTranslog(Long.MAX_VALUE); | ||
} | ||
|
||
@Override | ||
public void skipTranslogRecovery() { | ||
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; | ||
pendingTranslogRecovery.set(false); // we are good - now we can commit | ||
} | ||
|
||
private void recoverFromTranslogInternal() throws IOException { | ||
private void recoverFromTranslogInternal(long recoverUpToSeqNo) throws IOException { | ||
Translog.TranslogGeneration translogGeneration = translog.getGeneration(); | ||
final int opsRecovered; | ||
final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); | ||
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) { | ||
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( | ||
new Translog.TranslogGeneration(translog.getTranslogUUID(), translogGen), recoverUpToSeqNo)) { | ||
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot); | ||
} catch (Exception e) { | ||
throw new EngineException(shardId, "failed to recover from translog", e); | ||
|
@@ -2234,6 +2240,22 @@ public SeqNoStats getSeqNoStats(long globalCheckpoint) { | |
return localCheckpointTracker.getStats(globalCheckpoint); | ||
} | ||
|
||
@Override | ||
public Engine lockDownEngine() throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ideally we would do this entirely outside of the engine and maybe just pass and engine to the ctor of ReadOnlyEngine? do we need to make sure we don't receive writes after we did this or why do we acquire a write lock? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will move this to ctor of ReadOnlyEngine. |
||
try (ReleasableLock ignored = writeLock.acquire()) { | ||
refresh("lockdown", SearcherScope.INTERNAL); | ||
Searcher searcher = acquireSearcher("lockdown", SearcherScope.INTERNAL); | ||
try { | ||
ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(this.engineConfig, lastCommittedSegmentInfos, searcher, | ||
getSeqNoStats(getLastSyncedGlobalCheckpoint()), getTranslogStats()); | ||
searcher = null; | ||
return readOnlyEngine; | ||
} finally { | ||
IOUtils.close(searcher); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Returns the number of times a version was looked up either from the index. | ||
* Note this is only available if assertions are enabled | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this necessary? can we just use the
recoverFromTranslog(Long.MAX_VALUE)
instead in the tests?