diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 9bed93c371696..63659126f8438 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -98,7 +98,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -142,16 +141,6 @@ public abstract class Engine implements Closeable { */ protected volatile long lastWriteNanos = System.nanoTime(); - /* - * This marker tracks the max seq_no of either update operations or delete operations have been processed in this engine. - * An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index. - * This marker is started uninitialized (-2), and the optimization using seq_no will be disabled if this marker is uninitialized. - * The value of this marker never goes backwards, and is updated/changed differently on primary and replica: - * 1. A primary initializes this marker once using the max_seq_no from its history, then advances when processing an update or delete. - * 2. A replica never advances this marker by itself but only inherits from its primary (via advanceMaxSeqNoOfUpdatesOrDeletes). - */ - private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(UNASSIGNED_SEQ_NO); - protected Engine(EngineConfig engineConfig) { Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine"); @@ -1961,25 +1950,13 @@ public interface TranslogRecoveryRunner { * Moreover, operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates * in Lucene. To avoid this we check the local checkpoint tracker to see if an operation was already processed. * - * @see #reinitializeMaxSeqNoOfUpdatesOrDeletes() * @see #advanceMaxSeqNoOfUpdatesOrDeletes(long) */ - public final long getMaxSeqNoOfUpdatesOrDeletes() { - return maxSeqNoOfUpdatesOrDeletes.get(); - } - - /** - * A primary shard calls this method to re-initialize the max_seq_no_of_updates marker using the - * max_seq_no from Lucene index and translog before replaying the local translog in its local recovery. - */ - public abstract void reinitializeMaxSeqNoOfUpdatesOrDeletes(); + public abstract long getMaxSeqNoOfUpdatesOrDeletes(); /** * A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method * to advance this marker to at least the given sequence number. */ - public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { - maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo)); - assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo : maxSeqNoOfUpdatesOrDeletes.get() + " < " + seqNo; - } + public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 654d31d22671a..bb301bc4addbb 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -47,7 +47,6 @@ import org.apache.lucene.util.InfoStream; import org.elasticsearch.Assertions; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; @@ -146,6 +145,10 @@ public class InternalEngine extends Engine { private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1); private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + // max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine. + // An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index. + // The value of this marker never goes backwards, and is tracked/updated differently on primary and replica. + private final AtomicLong maxSeqNoOfUpdatesOrDeletes; private final CounterMetric numVersionLookups = new CounterMetric(); private final CounterMetric numIndexVersionsLookups = new CounterMetric(); // Lucene operations since this engine was opened - not include operations from existing segments. @@ -228,6 +231,7 @@ public InternalEngine(EngineConfig engineConfig) { () -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier); this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint()); this.internalSearcherManager.addListener(lastRefreshedCheckpointListener); + maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo())); success = true; } finally { if (success == false) { @@ -405,7 +409,6 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover flushLock.lock(); try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is uninitialized"; if (pendingTranslogRecovery.get() == false) { throw new IllegalStateException("Engine has already been recovered"); } @@ -874,7 +877,7 @@ public IndexResult index(Index index) throws IOException { final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; if (toAppend == false) { - advanceMaxSeqNoOfUpdatesOrDeletes(index.seqNo()); + advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo()); } } else { markSeqNoAsSeen(index.seqNo()); @@ -981,7 +984,6 @@ protected IndexingStrategy indexingStrategyForOperation(final Index index) throw protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); - assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; final IndexingStrategy plan; // resolve an external operation into an internal one which is safe to replay if (canOptimizeAddDocument(index)) { @@ -1322,7 +1324,6 @@ protected boolean assertNonPrimaryOrigin(final Operation operation) { protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); - assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; // resolve operation from external to internal final VersionValue versionValue = resolveDocVersion(delete, delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO); assert incrementVersionLookup(); @@ -2718,13 +2719,22 @@ private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) { assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get(); } + @Override + public long getMaxSeqNoOfUpdatesOrDeletes() { + return maxSeqNoOfUpdatesOrDeletes.get(); + } + + @Override + public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) { + if (maxSeqNoOfUpdatesOnPrimary == SequenceNumbers.UNASSIGNED_SEQ_NO) { + assert false : "max_seq_no_of_updates on primary is unassigned"; + throw new IllegalArgumentException("max_seq_no_of_updates on primary is unassigned"); + } + this.maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, maxSeqNoOfUpdatesOnPrimary)); + } + private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean allowDeleted, boolean relaxIfGapInSeqNo) { final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes(); - // If the primary is on an old version which does not replicate msu, we need to relax this assertion for that. - if (maxSeqNoOfUpdates == SequenceNumbers.UNASSIGNED_SEQ_NO) { - assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_5_0); - return true; - } // We treat a delete on the tombstones on replicas as a regular document, then use updateDocument (not addDocument). if (allowDeleted) { final VersionValue versionValue = versionMap.getVersionForAssert(id.bytes()); @@ -2742,12 +2752,6 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a return true; } - @Override - public void reinitializeMaxSeqNoOfUpdatesOrDeletes() { - final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()); - advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo); - } - private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOException { final Store store = engineConfig.getStore(); final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 777aff88e9dbc..7b47d60437fe1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -456,11 +456,6 @@ public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) { } - @Override - public void reinitializeMaxSeqNoOfUpdatesOrDeletes() { - advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo()); - } - protected void processReaders(IndexReader reader, IndexReader previousReader) { searcherFactory.processReaders(reader, previousReader); } @@ -487,4 +482,15 @@ public Translog.Operation next() { } }; } + + @Override + public long getMaxSeqNoOfUpdatesOrDeletes() { + return seqNoStats.getMaxSeqNo(); + } + + @Override + public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) { + assert maxSeqNoOfUpdatesOnPrimary <= getMaxSeqNoOfUpdatesOrDeletes() : + maxSeqNoOfUpdatesOnPrimary + ">" + getMaxSeqNoOfUpdatesOrDeletes(); + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7d6faa73a9413..ee67597efe31a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -532,14 +532,6 @@ public void updateShardState(final ShardRouting newRouting, * the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes. */ final Engine engine = getEngine(); - if (getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO) { - // If the old primary was on an old version that did not replicate the msu, - // we need to bootstrap it manually from its local history. - assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0); - engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); - } - // in case we previously reset engine, we need to forward MSU before replaying translog. - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) -> runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {})); /* Rolling the translog generation is not strictly needed here (as we will never have collisions between @@ -1411,9 +1403,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException { translogRecoveryStats::incrementRecoveredOperations); }; innerOpenEngineAndTranslog(); - final Engine engine = getEngine(); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); - engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); + getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); } /** @@ -2206,12 +2196,6 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint(); synchronized (mutex) { replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex - if (getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO) { - // If the old primary was on an old version that did not replicate the msu, - // we need to bootstrap it manually from its local history. - assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0); - getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); - } } } @@ -3138,7 +3122,6 @@ public void close() throws IOException { newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig())); onNewEngine(newEngineReference.get()); } - newEngineReference.get().advanceMaxSeqNoOfUpdatesOrDeletes(globalCheckpoint); final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { // TODO: add a dedicate recovery stats for the reset translog @@ -3185,11 +3168,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() { * @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, long, ActionListener) */ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { - assert seqNo != UNASSIGNED_SEQ_NO - || getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO : - "replica has max_seq_no_of_updates=" + getMaxSeqNoOfUpdatesOrDeletes() + " but primary does not"; getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); - assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo; } /** diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 23763426b3745..ae11500e54e5e 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -683,7 +683,6 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { engine = new InternalEngine(engine.config()); assertTrue(engine.isRecovering()); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test")); assertThat(counter.get(), equalTo(2)); @@ -700,7 +699,6 @@ public void testFlushIsDisabledDuringTranslogRecovery() throws IOException { engine = new InternalEngine(engine.config()); expectThrows(IllegalStateException.class, () -> engine.flush(true, true)); assertTrue(engine.isRecovering()); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertFalse(engine.isRecovering()); doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null); @@ -732,7 +730,6 @@ public void testTranslogMultipleOperationsSameDocument() throws IOException { IOUtils.close(engine); } try (Engine recoveringEngine = new InternalEngine(engine.config())) { - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); @@ -768,7 +765,6 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s } }; assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs)); - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertTrue(committed.get()); } finally { @@ -802,7 +798,6 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { } initialEngine.close(); recoveringEngine = new InternalEngine(initialEngine.config()); - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs); @@ -837,14 +832,12 @@ public void testRecoveryFromTranslogUpToSeqNo() throws IOException { engine.syncTranslog(); } try (InternalEngine engine = new InternalEngine(config)) { - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(engine.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo)); } try (InternalEngine engine = new InternalEngine(config)) { long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, upToSeqNo); assertThat(engine.getLocalCheckpoint(), equalTo(upToSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo)); @@ -1260,7 +1253,6 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException { store.associateIndexWithNewTranslog(translogUUID); } engine = new InternalEngine(config); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); } @@ -1280,7 +1272,6 @@ public void testSyncedFlushVanishesOnReplay() throws IOException { EngineConfig config = engine.config(); engine.close(); engine = new InternalEngine(config); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); @@ -2378,7 +2369,6 @@ public void testSeqNoAndCheckpoints() throws IOException { } try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) { - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); @@ -2733,7 +2723,6 @@ public void testCurrentTranslogIDisCommitted() throws IOException { assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); } assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); @@ -2751,7 +2740,6 @@ public void testCurrentTranslogIDisCommitted() throws IOException { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(2, engine.getTranslog().currentFileGeneration()); assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); @@ -2765,7 +2753,6 @@ public void testCurrentTranslogIDisCommitted() throws IOException { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("no changes - nothing to commit", "1", @@ -2872,7 +2859,6 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s } } }) { - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); @@ -2885,7 +2871,6 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier))) { - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, 1); final long committedGen = Long.valueOf( @@ -2954,7 +2939,6 @@ public void testTranslogReplay() throws IOException { engine.close(); // we need to reuse the engine config unless the parser.mappingModified won't work engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, numDocs, false); @@ -3718,7 +3702,6 @@ public void testEngineMaxTimestampIsInitialized() throws IOException { InternalEngine engine = new InternalEngine(configSupplier.apply(store))) { assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp()); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(timestamp1, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp()); final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), @@ -4085,7 +4068,6 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro IOUtils.close(initialEngine); } try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) { - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); recoveringEngine.fillSeqNoGaps(2); assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1))); @@ -4197,7 +4179,6 @@ protected long doGenerateSeqNoForOperation(Operation operation) { throw new UnsupportedOperationException(); } }; - noOpEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); noOpEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get()); final String reason = "filling gaps"; @@ -4433,7 +4414,6 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { totalTranslogOps = engine.getTranslog().totalOperations(); } try (InternalEngine engine = new InternalEngine(engineConfig)) { - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, globalCheckpoint.get()); engine.restoreLocalHistoryFromTranslog(translogHandler); assertThat(getDocIds(engine, true), equalTo(prevDocs)); @@ -4480,7 +4460,6 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpoint()); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations()); - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint()); @@ -4516,7 +4495,6 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { if (flushed) { assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); } - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint()); @@ -4711,7 +4689,6 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s super.commitIndexWriter(writer, translog, syncId); } }) { - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); int numDocs = scaledRandomIntBetween(10, 100); for (int docId = 0; docId < numDocs; docId++) { @@ -5485,8 +5462,7 @@ public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception { engine.close(); Set liveDocIds = new HashSet<>(); engine = new InternalEngine(engine.config()); - assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-2L)); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); + assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); int numOps = between(1, 500); for (int i = 0; i < numOps; i++) { long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes(); @@ -5556,7 +5532,6 @@ public void testRebuildLocalCheckpointTracker() throws Exception { "seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + "checkpoint=" + tracker.getCheckpoint(), tracker.contains(op.seqNo()), equalTo(seqNosInSafeCommit.contains(op.seqNo()))); } - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(getDocIds(engine, true), equalTo(docs)); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index b689400601dc6..f9437ac9251bf 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -92,7 +92,6 @@ public void testReadOnlyEngine() throws Exception { } // Close and reopen the main engine try (InternalEngine recoveringEngine = new InternalEngine(config)) { - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); // the locked down engine should still point to the previous commit assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); @@ -224,7 +223,6 @@ public void testRecoverFromTranslogAppliesNoOperations() throws IOException { } try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) { final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings()); - readOnlyEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong()); assertThat(translogHandler.appliedOperations(), equalTo(0L)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index bf1bfa668829e..b34f364bbed2c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1110,11 +1110,9 @@ public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test"); final long globalCheckpoint = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); - final long currentMaxSeqNoOfUpdates = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); final long maxSeqNoOfUpdatesOrDeletes = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo); final Set docsBeforeRollback = getShardDocUIDs(indexShard); final CountDownLatch latch = new CountDownLatch(1); - final boolean shouldRollback = Math.max(globalCheckpointOnReplica, globalCheckpoint) < maxSeqNo; randomReplicaOperationPermitAcquisition(indexShard, indexShard.getPendingPrimaryTerm() + 1, globalCheckpoint, @@ -1133,13 +1131,7 @@ public void onFailure(Exception e) { }, ""); latch.await(); - if (shouldRollback) { - assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Collections.max( - Arrays.asList(maxSeqNoOfUpdatesOrDeletes, globalCheckpoint, globalCheckpointOnReplica)) - )); - } else { - assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(maxSeqNoOfUpdatesOrDeletes, currentMaxSeqNoOfUpdates))); - } + assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNo)); final ShardRouting newRouting = indexShard.routingEntry().moveActiveReplicaToPrimary(); final CountDownLatch resyncLatch = new CountDownLatch(1); indexShard.updateShardState( @@ -1154,7 +1146,6 @@ public void onFailure(Exception e) { assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo)); assertThat(getShardDocUIDs(indexShard), equalTo(docsBeforeRollback)); - // we conservatively roll MSU forward to maxSeqNo during restoreLocalHistory, ideally it should become just currentMaxSeqNoOfUpdates assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNo)); closeShard(indexShard, false); } @@ -3653,6 +3644,7 @@ public void testSupplyTombstoneDoc() throws Exception { public void testResetEngine() throws Exception { IndexShard shard = newStartedShard(false); indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint())); + long maxSeqNoBeforeRollback = shard.seqNoStats().getMaxSeqNo(); final long globalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint()); shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); Set docBelowGlobalCheckpoint = getShardDocUIDs(shard).stream() @@ -3694,7 +3686,7 @@ public void testResetEngine() throws Exception { assertThat(getShardDocUIDs(shard), equalTo(docBelowGlobalCheckpoint)); assertThat(shard.seqNoStats().getMaxSeqNo(), equalTo(globalCheckpoint)); assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations())); - assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(globalCheckpoint)); + assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNoBeforeRollback)); done.set(true); thread.join(); closeShard(shard, false); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 0216756e65a84..e264d33ffed61 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -152,7 +152,6 @@ public void onFailedEngine(String reason, @Nullable Exception e) { () -> primaryTerm, EngineTestCase.tombstoneDocSupplier()); engine = new InternalEngine(config); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 2a5b110795628..afa319af7e1cf 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -515,7 +515,6 @@ protected InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFa } InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); - internalEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); return internalEngine; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index bbb0689a8a7e6..619e0a04baf9a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -133,7 +133,8 @@ protected long generateSeqNoForOperationOnPrimary(final Operation operation) { @Override protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) { - // ignore, this is not really a primary + assert getMaxSeqNoOfUpdatesOrDeletes() >= seqNo : seqNo + " < " + getMaxSeqNoOfUpdatesOrDeletes(); + super.advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(seqNo); // extra safe in production code } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index e3d997886334b..4a56d6370eb91 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -289,7 +289,6 @@ private FollowingEngine createEngine(Store store, EngineConfig config) throws IO store.associateIndexWithNewTranslog(translogUuid); FollowingEngine followingEngine = new FollowingEngine(config); TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings()); - followingEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); followingEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); return followingEngine; } @@ -495,7 +494,6 @@ private void runFollowTest(CheckedBiConsumer