diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 4c9e966e39fc2..3527eaf9cee66 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1952,7 +1952,7 @@ public interface TranslogRecoveryRunner { * Moreover, operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates * in Lucene. To avoid this we check the local checkpoint tracker to see if an operation was already processed. * - * @see #initializeMaxSeqNoOfUpdatesOrDeletes() + * @see #reinitializeMaxSeqNoOfUpdatesOrDeletes() * @see #advanceMaxSeqNoOfUpdatesOrDeletes(long) */ public final long getMaxSeqNoOfUpdatesOrDeletes() { @@ -1960,10 +1960,10 @@ public final long getMaxSeqNoOfUpdatesOrDeletes() { } /** - * A primary shard calls this method once to initialize the max_seq_no_of_updates marker using the + * A primary shard calls this method to re-initialize the max_seq_no_of_updates marker using the * max_seq_no from Lucene index and translog before replaying the local translog in its local recovery. */ - public abstract void initializeMaxSeqNoOfUpdatesOrDeletes(); + public abstract void reinitializeMaxSeqNoOfUpdatesOrDeletes(); /** * A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index c60cb1a23fde8..f7c9077766ebe 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2730,9 +2730,7 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a } @Override - public void initializeMaxSeqNoOfUpdatesOrDeletes() { - assert getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO : - "max_seq_no_of_updates is already initialized [" + getMaxSeqNoOfUpdatesOrDeletes() + "]"; + public void reinitializeMaxSeqNoOfUpdatesOrDeletes() { final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()); advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 62650a470e2f8..5b10db69e94c7 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -450,7 +450,7 @@ public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) { } @Override - public void initializeMaxSeqNoOfUpdatesOrDeletes() { + public void reinitializeMaxSeqNoOfUpdatesOrDeletes() { advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo()); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index debd04c4f9ae5..99489ccac691f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -536,6 +536,8 @@ public void updateShardState(final ShardRouting newRouting, assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0); engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); } + // in case we previously reset engine, we need to forward MSU before replaying translog. + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) -> runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {})); /* Rolling the translog generation is not strictly needed here (as we will never have collisions between @@ -1394,7 +1396,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException { }; innerOpenEngineAndTranslog(); final Engine engine = getEngine(); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a48216ecaa73a..76c479f051a47 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -677,7 +677,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { trimUnsafeCommits(engine.config()); engine = new InternalEngine(engine.config()); assertTrue(engine.isRecovering()); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test")); assertThat(counter.get(), equalTo(2)); @@ -695,7 +695,7 @@ public void testFlushIsDisabledDuringTranslogRecovery() throws IOException { engine = new InternalEngine(engine.config()); expectThrows(IllegalStateException.class, () -> engine.flush(true, true)); assertTrue(engine.isRecovering()); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertFalse(engine.isRecovering()); doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null); @@ -728,7 +728,7 @@ public void testTranslogMultipleOperationsSameDocument() throws IOException { } trimUnsafeCommits(engine.config()); try (Engine recoveringEngine = new InternalEngine(engine.config())) { - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); @@ -765,7 +765,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s } }; assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs)); - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertTrue(committed.get()); } finally { @@ -800,7 +800,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { initialEngine.close(); trimUnsafeCommits(initialEngine.config()); recoveringEngine = new InternalEngine(initialEngine.config()); - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs); @@ -836,7 +836,7 @@ public void testRecoveryFromTranslogUpToSeqNo() throws IOException { } trimUnsafeCommits(config); try (InternalEngine engine = new InternalEngine(config)) { - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(engine.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo)); @@ -844,7 +844,7 @@ public void testRecoveryFromTranslogUpToSeqNo() throws IOException { trimUnsafeCommits(config); try (InternalEngine engine = new InternalEngine(config)) { long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, upToSeqNo); assertThat(engine.getLocalCheckpoint(), equalTo(upToSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo)); @@ -1261,7 +1261,7 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException { } trimUnsafeCommits(config); engine = new InternalEngine(config); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); } @@ -1282,7 +1282,7 @@ public void testSyncedFlushVanishesOnReplay() throws IOException { engine.close(); trimUnsafeCommits(config); engine = new InternalEngine(config); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); @@ -2381,7 +2381,7 @@ public void testSeqNoAndCheckpoints() throws IOException { trimUnsafeCommits(initialEngine.engineConfig); try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) { - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); @@ -2737,7 +2737,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); } assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); @@ -2756,7 +2756,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(2, engine.getTranslog().currentFileGeneration()); assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); @@ -2771,7 +2771,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("no changes - nothing to commit", "1", @@ -2879,7 +2879,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s } } }) { - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); @@ -2892,7 +2892,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier))) { - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, 1); final long committedGen = Long.valueOf( @@ -2963,7 +2963,7 @@ public void testTranslogReplay() throws IOException { trimUnsafeCommits(copy(engine.config(), inSyncGlobalCheckpointSupplier)); // we need to reuse the engine config unless the parser.mappingModified won't work engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, numDocs, false); @@ -3708,7 +3708,7 @@ public void testEngineMaxTimestampIsInitialized() throws IOException { InternalEngine engine = new InternalEngine(configSupplier.apply(store))) { assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), @@ -4076,7 +4076,7 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro } trimUnsafeCommits(initialEngine.config()); try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) { - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); recoveringEngine.fillSeqNoGaps(2); assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1))); @@ -4189,7 +4189,7 @@ protected long doGenerateSeqNoForOperation(Operation operation) { throw new UnsupportedOperationException(); } }; - noOpEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + noOpEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); noOpEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get()); final String reason = "filling gaps"; @@ -4426,7 +4426,7 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { } trimUnsafeCommits(engineConfig); try (InternalEngine engine = new InternalEngine(engineConfig)) { - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, globalCheckpoint.get()); engine.restoreLocalHistoryFromTranslog(translogHandler); assertThat(getDocIds(engine, true), equalTo(prevDocs)); @@ -4474,7 +4474,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get)); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations()); - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint()); @@ -4511,7 +4511,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { if (flushed) { assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); } - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint()); @@ -4706,7 +4706,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s super.commitIndexWriter(writer, translog, syncId); } }) { - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); int numDocs = scaledRandomIntBetween(10, 100); for (int docId = 0; docId < numDocs; docId++) { @@ -5482,7 +5482,7 @@ public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception { Set liveDocIds = new HashSet<>(); engine = new InternalEngine(engine.config()); assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-2L)); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); int numOps = between(1, 500); for (int i = 0; i < numOps; i++) { long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes(); @@ -5553,7 +5553,7 @@ public void testRebuildLocalCheckpointTracker() throws Exception { "seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + "checkpoint=" + tracker.getCheckpoint(), tracker.contains(op.seqNo()), equalTo(seqNosInSafeCommit.contains(op.seqNo()))); } - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(getDocIds(engine, true), equalTo(docs)); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index 147885ca36411..ce54259c2deea 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -96,7 +96,7 @@ public void testReadOnlyEngine() throws Exception { // Close and reopen the main engine InternalEngineTests.trimUnsafeCommits(config); try (InternalEngine recoveringEngine = new InternalEngine(config)) { - recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); // the locked down engine should still point to the previous commit assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); @@ -235,7 +235,7 @@ public void testRecoverFromTranslogAppliesNoOperations() throws IOException { } try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) { final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings()); - readOnlyEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + readOnlyEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong()); assertThat(translogHandler.appliedOperations(), equalTo(0L)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 4b2fc5205fa76..39121a62d1ffc 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -546,7 +546,7 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception { // most of the time this is large enough that most of the time there will be at least one gap final int operations = 1024 - scaledRandomIntBetween(0, 1024); - final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED)); + final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), false); final int maxSeqNo = result.maxSeqNo; @@ -1093,7 +1093,7 @@ public void testGlobalCheckpointSync() throws IOException { public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); final int operations = 1024 - scaledRandomIntBetween(0, 1024); - indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED)); + indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), true); final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); final long globalCheckpointOnReplica = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); @@ -1145,9 +1145,9 @@ public void onFailure(Exception e) { assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo)); assertThat(getShardDocUIDs(indexShard), equalTo(docsBeforeRollback)); if (shouldRollback) { - assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Collections.max( - Arrays.asList(currentMaxSeqNoOfUpdates, maxSeqNoOfUpdatesOrDeletes, globalCheckpoint, globalCheckpointOnReplica)) - )); + // we conservatively roll MSU forward to maxSeqNo during restoreLocalHistory, ideally it should become just + // currentMaxSeqNoOfUpdates + assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNo)); } else { assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, maxSeqNoOfUpdatesOrDeletes))); } @@ -1159,7 +1159,9 @@ public void testRollbackReplicaEngineOnPromotion() throws IOException, Interrupt // most of the time this is large enough that most of the time there will be at least one gap final int operations = 1024 - scaledRandomIntBetween(0, 1024); - indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED)); + // todo: all tests should run with allowUpdates=true, but this specific test sometimes fails during lucene commit when updates are + // added (seed = F37E9647ABE5928) + indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), false); final long globalCheckpointOnReplica = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test"); @@ -1202,7 +1204,7 @@ public void onFailure(final Exception e) { } assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(newMaxSeqNoOfUpdates)); // ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances - final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint())); + final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()), false); assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint)); closeShard(indexShard, false); } @@ -3126,19 +3128,25 @@ class Result { * @param indexShard the shard * @param operations the number of operations * @param offset the starting sequence number + * @param allowUpdates whether updates should be added. * @return a pair of the maximum sequence number and whether or not a gap was introduced * @throws IOException if an I/O exception occurs while indexing on the shard */ private Result indexOnReplicaWithGaps( final IndexShard indexShard, final int operations, - final int offset) throws IOException { + final int offset, + boolean allowUpdates) throws IOException { int localCheckpoint = offset; int max = offset; boolean gap = false; + Set ids = new HashSet<>(); for (int i = offset + 1; i < operations; i++) { if (!rarely() || i == operations - 1) { // last operation can't be a gap as it's not a gap anymore - final String id = Integer.toString(i); + final String id = ids.isEmpty() || randomBoolean() ? Integer.toString(i) : randomFrom(ids); + if (allowUpdates && ids.add(id) == false) { // this is an update + indexShard.advanceMaxSeqNoOfUpdatesOrDeletes(i); + } SourceToParse sourceToParse = new SourceToParse(indexShard.shardId().getIndexName(), "_doc", id, new BytesArray("{}"), XContentType.JSON); indexShard.applyIndexOperationOnReplica(i, 1, @@ -3593,7 +3601,7 @@ public void testSupplyTombstoneDoc() throws Exception { public void testResetEngine() throws Exception { IndexShard shard = newStartedShard(false); - indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint())); + indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint()), false); final long globalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint()); shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); Set docBelowGlobalCheckpoint = getShardDocUIDs(shard).stream() @@ -3633,7 +3641,7 @@ public void testResetEngine() throws Exception { public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception { final IndexShard replica = newStartedShard(false); - indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint())); + indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()), false); final int nbTermUpdates = randomIntBetween(1, 5); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index c80b3b5074921..4ad95c43b7077 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -148,7 +148,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { () -> primaryTerm, EngineTestCase.tombstoneDocSupplier()); engine = new InternalEngine(config); - engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index e7b3f39747124..25854b614057f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -514,7 +514,7 @@ private InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFact } InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); - internalEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + internalEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); return internalEngine; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index b8d44f76026f2..dfac5ef2654b8 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -289,7 +289,7 @@ private FollowingEngine createEngine(Store store, EngineConfig config) throws IO store.associateIndexWithNewTranslog(translogUuid); FollowingEngine followingEngine = new FollowingEngine(config); TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings()); - followingEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + followingEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); followingEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); return followingEngine; } @@ -495,7 +495,7 @@ private void runFollowTest(CheckedBiConsumer