Skip to content

Commit

Permalink
Cascading primary failure lead to MSU too low (#40249)
Browse files Browse the repository at this point in the history
If a replica were first reset due to one primary failover and then
promoted (before resync completes), its MSU would not include changes
since global checkpoint, leading to errors during translog replay.

Fixed by re-initializing MSU before restoring local history.
  • Loading branch information
henningandersen committed Mar 20, 2019
1 parent 9a16423 commit 536f430
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1952,18 +1952,18 @@ public interface TranslogRecoveryRunner {
* Moreover, operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates
* in Lucene. To avoid this we check the local checkpoint tracker to see if an operation was already processed.
*
* @see #initializeMaxSeqNoOfUpdatesOrDeletes()
* @see #reinitializeMaxSeqNoOfUpdatesOrDeletes()
* @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)
*/
public final long getMaxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes.get();
}

/**
* A primary shard calls this method once to initialize the max_seq_no_of_updates marker using the
* A primary shard calls this method to re-initialize the max_seq_no_of_updates marker using the
* max_seq_no from Lucene index and translog before replaying the local translog in its local recovery.
*/
public abstract void initializeMaxSeqNoOfUpdatesOrDeletes();
public abstract void reinitializeMaxSeqNoOfUpdatesOrDeletes();

/**
* A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2730,9 +2730,7 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a
}

@Override
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
assert getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO :
"max_seq_no_of_updates is already initialized [" + getMaxSeqNoOfUpdatesOrDeletes() + "]";
public void reinitializeMaxSeqNoOfUpdatesOrDeletes() {
final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo());
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {
}

@Override
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
public void reinitializeMaxSeqNoOfUpdatesOrDeletes() {
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,8 @@ public void updateShardState(final ShardRouting newRouting,
assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0);
engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
}
// in case we previously reset engine, we need to forward MSU before replaying translog.
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) ->
runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}));
/* Rolling the translog generation is not strictly needed here (as we will never have collisions between
Expand Down Expand Up @@ -1394,7 +1396,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
};
innerOpenEngineAndTranslog();
final Engine engine = getEngine();
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
trimUnsafeCommits(engine.config());
engine = new InternalEngine(engine.config());
assertTrue(engine.isRecovering());
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test"));
assertThat(counter.get(), equalTo(2));
Expand All @@ -695,7 +695,7 @@ public void testFlushIsDisabledDuringTranslogRecovery() throws IOException {
engine = new InternalEngine(engine.config());
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
assertTrue(engine.isRecovering());
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertFalse(engine.isRecovering());
doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null);
Expand Down Expand Up @@ -728,7 +728,7 @@ public void testTranslogMultipleOperationsSameDocument() throws IOException {
}
trimUnsafeCommits(engine.config());
try (Engine recoveringEngine = new InternalEngine(engine.config())) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
Expand Down Expand Up @@ -765,7 +765,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
}
};
assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs));
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertTrue(committed.get());
} finally {
Expand Down Expand Up @@ -800,7 +800,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException {
initialEngine.close();
trimUnsafeCommits(initialEngine.config());
recoveringEngine = new InternalEngine(initialEngine.config());
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs);
Expand Down Expand Up @@ -836,15 +836,15 @@ public void testRecoveryFromTranslogUpToSeqNo() throws IOException {
}
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(engine.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo));
}
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo);
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, upToSeqNo);
assertThat(engine.getLocalCheckpoint(), equalTo(upToSeqNo));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo));
Expand Down Expand Up @@ -1261,7 +1261,7 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException {
}
trimUnsafeCommits(config);
engine = new InternalEngine(config);
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
}
Expand All @@ -1282,7 +1282,7 @@ public void testSyncedFlushVanishesOnReplay() throws IOException {
engine.close();
trimUnsafeCommits(config);
engine = new InternalEngine(config);
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertNull("Sync ID must be gone since we have a document to replay",
engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
Expand Down Expand Up @@ -2381,7 +2381,7 @@ public void testSeqNoAndCheckpoints() throws IOException {

trimUnsafeCommits(initialEngine.engineConfig);
try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);

assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
Expand Down Expand Up @@ -2737,7 +2737,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
Expand All @@ -2756,7 +2756,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(2, engine.getTranslog().currentFileGeneration());
assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations());
Expand All @@ -2771,7 +2771,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("no changes - nothing to commit", "1",
Expand Down Expand Up @@ -2879,7 +2879,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
}
}
}) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
final ParsedDocument doc1 = testParsedDocument("1", null,
testDocumentWithTextField(), SOURCE, null);
Expand All @@ -2892,7 +2892,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
try (InternalEngine engine =
new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null,
globalCheckpointSupplier))) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertVisibleCount(engine, 1);
final long committedGen = Long.valueOf(
Expand Down Expand Up @@ -2963,7 +2963,7 @@ public void testTranslogReplay() throws IOException {
trimUnsafeCommits(copy(engine.config(), inSyncGlobalCheckpointSupplier));
// we need to reuse the engine config unless the parser.mappingModified won't work
engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);

assertVisibleCount(engine, numDocs, false);
Expand Down Expand Up @@ -3708,7 +3708,7 @@ public void testEngineMaxTimestampIsInitialized() throws IOException {
InternalEngine engine = new InternalEngine(configSupplier.apply(store))) {
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
Expand Down Expand Up @@ -4076,7 +4076,7 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro
}
trimUnsafeCommits(initialEngine.config());
try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
recoveringEngine.fillSeqNoGaps(2);
assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1)));
Expand Down Expand Up @@ -4189,7 +4189,7 @@ protected long doGenerateSeqNoForOperation(Operation operation) {
throw new UnsupportedOperationException();
}
};
noOpEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
noOpEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
noOpEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get());
final String reason = "filling gaps";
Expand Down Expand Up @@ -4426,7 +4426,7 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException {
}
trimUnsafeCommits(engineConfig);
try (InternalEngine engine = new InternalEngine(engineConfig)) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, globalCheckpoint.get());
engine.restoreLocalHistoryFromTranslog(translogHandler);
assertThat(getDocIds(engine, true), equalTo(prevDocs));
Expand Down Expand Up @@ -4474,7 +4474,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get));
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations());
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint());
Expand Down Expand Up @@ -4511,7 +4511,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
if (flushed) {
assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
}
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint());
Expand Down Expand Up @@ -4706,7 +4706,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
super.commitIndexWriter(writer, translog, syncId);
}
}) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
Expand Down Expand Up @@ -5482,7 +5482,7 @@ public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception {
Set<String> liveDocIds = new HashSet<>();
engine = new InternalEngine(engine.config());
assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-2L));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
int numOps = between(1, 500);
for (int i = 0; i < numOps; i++) {
long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes();
Expand Down Expand Up @@ -5553,7 +5553,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
"seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + "checkpoint=" + tracker.getCheckpoint(),
tracker.contains(op.seqNo()), equalTo(seqNosInSafeCommit.contains(op.seqNo())));
}
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(getDocIds(engine, true), equalTo(docs));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testReadOnlyEngine() throws Exception {
// Close and reopen the main engine
InternalEngineTests.trimUnsafeCommits(config);
try (InternalEngine recoveringEngine = new InternalEngine(config)) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
// the locked down engine should still point to the previous commit
assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
Expand Down Expand Up @@ -235,7 +235,7 @@ public void testRecoverFromTranslogAppliesNoOperations() throws IOException {
}
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings());
readOnlyEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
readOnlyEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong());

assertThat(translogHandler.appliedOperations(), equalTo(0L));
Expand Down
Loading

0 comments on commit 536f430

Please sign in to comment.