diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 88d7c57f5e424..4b0ec9ea08773 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -27,7 +27,6 @@ import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.document.LatLonDocValuesField; -import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.CorruptIndexException; @@ -95,7 +94,6 @@ import org.elasticsearch.index.analysis.AnalyzerScope; import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.fielddata.IndexFieldData; -import org.elasticsearch.index.mapper.SeqNoFieldMapper; import java.io.IOException; import java.text.ParseException; @@ -105,7 +103,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.function.LongConsumer; public class Lucene { public static final String LATEST_DOC_VALUES_FORMAT = "Lucene70"; @@ -1050,39 +1047,4 @@ public CacheHelper getReaderCacheHelper() { } }; } - - /** - * Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive) - * in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found. - * - * @param directoryReader the directory reader to scan - * @param fromSeqNo the lower bound of a range of seq_no to scan (inclusive) - * @param toSeqNo the upper bound of a range of seq_no to scan (inclusive) - * @param onNewSeqNo the callback to be called whenever a new valid sequence number is found - */ - public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, long toSeqNo, - LongConsumer onNewSeqNo) throws IOException { - final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader); - final IndexSearcher searcher = new IndexSearcher(reader); - searcher.setQueryCache(null); - final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo); - final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); - for (LeafReaderContext leaf : reader.leaves()) { - final Scorer scorer = weight.scorer(leaf); - if (scorer == null) { - continue; - } - final DocIdSetIterator docIdSetIterator = scorer.iterator(); - final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); - int docId; - while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - if (seqNoDocValues == null || seqNoDocValues.advanceExact(docId) == false) { - throw new IllegalStateException("seq_no doc_values not found for doc_id=" + docId); - } - final long seqNo = seqNoDocValues.longValue(); - assert fromSeqNo <= seqNo && seqNo <= toSeqNo : "from_seq_no=" + fromSeqNo + " seq_no=" + seqNo + " to_seq_no=" + toSeqNo; - onNewSeqNo.accept(seqNo); - } - } - } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java new file mode 100644 index 0000000000000..009dc9799d52a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.NumericDocValues; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.VersionFieldMapper; + +import java.io.IOException; +import java.util.Objects; + +final class CombinedDocValues { + private final NumericDocValues versionDV; + private final NumericDocValues seqNoDV; + private final NumericDocValues primaryTermDV; + private final NumericDocValues tombstoneDV; + private final NumericDocValues recoverySource; + + CombinedDocValues(LeafReader leafReader) throws IOException { + this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing"); + this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing"); + this.primaryTermDV = Objects.requireNonNull( + leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing"); + this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); + this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME); + } + + long docVersion(int segmentDocId) throws IOException { + assert versionDV.docID() < segmentDocId; + if (versionDV.advanceExact(segmentDocId) == false) { + throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found"); + } + return versionDV.longValue(); + } + + long docSeqNo(int segmentDocId) throws IOException { + assert seqNoDV.docID() < segmentDocId; + if (seqNoDV.advanceExact(segmentDocId) == false) { + throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"); + } + return seqNoDV.longValue(); + } + + long docPrimaryTerm(int segmentDocId) throws IOException { + if (primaryTermDV == null) { + return -1L; + } + assert primaryTermDV.docID() < segmentDocId; + // Use -1 for docs which don't have primary term. The caller considers those docs as nested docs. + if (primaryTermDV.advanceExact(segmentDocId) == false) { + return -1; + } + return primaryTermDV.longValue(); + } + + boolean isTombstone(int segmentDocId) throws IOException { + if (tombstoneDV == null) { + return false; + } + assert tombstoneDV.docID() < segmentDocId; + return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0; + } + + boolean hasRecoverySource(int segmentDocId) throws IOException { + if (recoverySource == null) { + return false; + } + assert recoverySource.docID() < segmentDocId; + return recoverySource.advanceExact(segmentDocId); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 408f70d70d1ce..cdd6cd86e6afb 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -23,6 +23,7 @@ import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader; import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader.FSTLoadMode; import org.apache.lucene.document.Field; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; @@ -31,17 +32,23 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; import org.apache.lucene.search.SearcherFactory; import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.Weight; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; @@ -68,12 +75,14 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.fieldvisitor.IdOnlyFieldVisitor; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.seqno.LocalCheckpointTracker; @@ -94,7 +103,6 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -109,7 +117,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.LongSupplier; -import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; public class InternalEngine extends Engine { @@ -203,6 +211,7 @@ public InternalEngine(EngineConfig engineConfig) { this.softDeletesPolicy = newSoftDeletesPolicy(); this.combinedDeletionPolicy = new CombinedDeletionPolicy(logger, translogDeletionPolicy, softDeletesPolicy, translog::getLastSyncedGlobalCheckpoint); + this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); writer = createWriter(); bootstrapAppendOnlyInfoFromWriter(writer); historyUUID = loadHistoryUUID(writer); @@ -232,11 +241,17 @@ public InternalEngine(EngineConfig engineConfig) { for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) { this.internalSearcherManager.addListener(listener); } - this.localCheckpointTracker = createLocalCheckpointTracker(engineConfig, lastCommittedSegmentInfos, logger, - () -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier); this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint()); this.internalSearcherManager.addListener(lastRefreshedCheckpointListener); maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo())); + if (softDeleteEnabled && localCheckpointTracker.getCheckpoint() < localCheckpointTracker.getMaxSeqNo()) { + try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) { + restoreVersionMapAndCheckpointTracker(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); + } catch (IOException e) { + throw new EngineCreationFailureException(config().getShardId(), + "failed to restore version map and local checkpoint tracker", e); + } + } success = true; } finally { if (success == false) { @@ -250,30 +265,16 @@ public InternalEngine(EngineConfig engineConfig) { logger.trace("created new InternalEngine"); } - private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig engineConfig, SegmentInfos lastCommittedSegmentInfos, - Logger logger, Supplier searcherSupplier, BiFunction localCheckpointTrackerSupplier) { - try { - final SequenceNumbers.CommitInfo seqNoStats = - SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommittedSegmentInfos.userData.entrySet()); - final long maxSeqNo = seqNoStats.maxSeqNo; - final long localCheckpoint = seqNoStats.localCheckpoint; - logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint); - final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint); - // Operations that are optimized using max_seq_no_of_updates optimization must not be processed twice; otherwise, they will - // create duplicates in Lucene. To avoid this we check the LocalCheckpointTracker to see if an operation was already processed. - // Thus, we need to restore the LocalCheckpointTracker bit by bit to ensure the consistency between LocalCheckpointTracker and - // Lucene index. This is not the only solution since we can bootstrap max_seq_no_of_updates with max_seq_no of the commit to - // disable the MSU optimization during recovery. Here we prefer to maintain the consistency of LocalCheckpointTracker. - if (localCheckpoint < maxSeqNo && engineConfig.getIndexSettings().isSoftDeleteEnabled()) { - try (Searcher searcher = searcherSupplier.get()) { - Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, maxSeqNo, - tracker::markSeqNoAsCompleted); - } - } - return tracker; - } catch (IOException ex) { - throw new EngineCreationFailureException(engineConfig.getShardId(), "failed to create local checkpoint tracker", ex); - } + private LocalCheckpointTracker createLocalCheckpointTracker( + BiFunction localCheckpointTrackerSupplier) throws IOException { + final long maxSeqNo; + final long localCheckpoint; + final SequenceNumbers.CommitInfo seqNoStats = + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet()); + maxSeqNo = seqNoStats.maxSeqNo; + localCheckpoint = seqNoStats.localCheckpoint; + logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint); + return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint); } private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException { @@ -676,21 +677,26 @@ enum OpVsLuceneDocStatus { LUCENE_DOC_NOT_FOUND } + private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long seqNo, long primaryTerm, VersionValue versionValue) { + Objects.requireNonNull(versionValue); + if (seqNo > versionValue.seqNo) { + return OpVsLuceneDocStatus.OP_NEWER; + } else if (seqNo == versionValue.seqNo) { + assert versionValue.term == primaryTerm : "primary term not matched; id=" + id + " seq_no=" + seqNo + + " op_term=" + primaryTerm + " existing_term=" + versionValue.term; + return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } else { + return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } + } + private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; final OpVsLuceneDocStatus status; VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); if (versionValue != null) { - if (op.seqNo() > versionValue.seqNo) { - status = OpVsLuceneDocStatus.OP_NEWER; - } else if (op.seqNo() == versionValue.seqNo) { - assert versionValue.term == op.primaryTerm() : "primary term not matched; id=" + op.id() + " seq_no=" + op.seqNo() - + " op_term=" + op.primaryTerm() + " existing_term=" + versionValue.term; - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; - } else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; - } + status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue); } else { // load from index assert incrementIndexVersionLookup(); @@ -1875,8 +1881,9 @@ void clearDeletedTombstones() { } // for testing - final Collection getDeletedTombstones() { - return versionMap.getAllTombstones().values(); + final Map getVersionMap() { + return Stream.concat(versionMap.getAllCurrent().entrySet().stream(), versionMap.getAllTombstones().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } @Override @@ -2507,10 +2514,6 @@ private boolean incrementIndexVersionLookup() { return true; } - int getVersionMapSize() { - return versionMap.getAllCurrent().size(); - } - boolean isSafeAccessRequired() { return versionMap.isSafeAccessRequired(); } @@ -2773,4 +2776,57 @@ private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOExcept final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUUID); store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, engineConfig.getIndexSettings().getIndexVersionCreated()); } + + /** + * Restores the live version map and local checkpoint of this engine using documents (including soft-deleted) + * after the local checkpoint in the safe commit. This step ensures the live version map and checkpoint tracker + * are in sync with the Lucene commit. + */ + private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException { + final IndexSearcher searcher = new IndexSearcher(directoryReader); + searcher.setQueryCache(null); + final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getLocalCheckpoint() + 1, Long.MAX_VALUE); + final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); + for (LeafReaderContext leaf : directoryReader.leaves()) { + final Scorer scorer = weight.scorer(leaf); + if (scorer == null) { + continue; + } + final CombinedDocValues dv = new CombinedDocValues(leaf.reader()); + final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor(); + final DocIdSetIterator iterator = scorer.iterator(); + int docId; + while ((docId = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + final long primaryTerm = dv.docPrimaryTerm(docId); + if (primaryTerm == -1L) { + continue; // skip children docs which do not have primary term + } + final long seqNo = dv.docSeqNo(docId); + localCheckpointTracker.markSeqNoAsCompleted(seqNo); + idFieldVisitor.reset(); + leaf.reader().document(docId, idFieldVisitor); + if (idFieldVisitor.getId() == null) { + assert dv.isTombstone(docId); + continue; + } + final BytesRef uid = new Term(IdFieldMapper.NAME, Uid.encodeId(idFieldVisitor.getId())).bytes(); + try (Releasable ignored = versionMap.acquireLock(uid)) { + final VersionValue curr = versionMap.getUnderLock(uid); + if (curr == null || + compareOpToVersionMapOnSeqNo(idFieldVisitor.getId(), seqNo, primaryTerm, curr) == OpVsLuceneDocStatus.OP_NEWER) { + if (dv.isTombstone(docId)) { + // use 0L for the start time so we can prune this delete tombstone quickly + // when the local checkpoint advances (i.e., after a recovery completed). + final long startTime = 0L; + versionMap.putDeleteUnderLock(uid, new DeleteVersionValue(dv.docVersion(docId), seqNo, primaryTerm, startTime)); + } else { + versionMap.putIndexUnderLock(uid, new IndexVersionValue(null, dv.docVersion(docId), seqNo, primaryTerm)); + } + } + } + } + } + // remove live entries in the version map + refresh("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL, true); + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index a3e86ab1606df..cb0862ff13171 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -40,14 +40,12 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.translog.Translog; import java.io.Closeable; import java.io.IOException; import java.util.Comparator; import java.util.List; -import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -305,64 +303,4 @@ private static final class ParallelArray { } } - private static final class CombinedDocValues { - private final NumericDocValues versionDV; - private final NumericDocValues seqNoDV; - private final NumericDocValues primaryTermDV; - private final NumericDocValues tombstoneDV; - private final NumericDocValues recoverySource; - - CombinedDocValues(LeafReader leafReader) throws IOException { - this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing"); - this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing"); - this.primaryTermDV = Objects.requireNonNull( - leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing"); - this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); - this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME); - } - - long docVersion(int segmentDocId) throws IOException { - assert versionDV.docID() < segmentDocId; - if (versionDV.advanceExact(segmentDocId) == false) { - throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found"); - } - return versionDV.longValue(); - } - - long docSeqNo(int segmentDocId) throws IOException { - assert seqNoDV.docID() < segmentDocId; - if (seqNoDV.advanceExact(segmentDocId) == false) { - throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"); - } - return seqNoDV.longValue(); - } - - long docPrimaryTerm(int segmentDocId) throws IOException { - if (primaryTermDV == null) { - return -1L; - } - assert primaryTermDV.docID() < segmentDocId; - // Use -1 for docs which don't have primary term. The caller considers those docs as nested docs. - if (primaryTermDV.advanceExact(segmentDocId) == false) { - return -1; - } - return primaryTermDV.longValue(); - } - - boolean isTombstone(int segmentDocId) throws IOException { - if (tombstoneDV == null) { - return false; - } - assert tombstoneDV.docID() < segmentDocId; - return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0; - } - - boolean hasRecoverySource(int segmentDocId) throws IOException { - if (recoverySource == null) { - return false; - } - assert recoverySource.docID() < segmentDocId; - return recoverySource.advanceExact(segmentDocId); - } - } } diff --git a/server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java b/server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java new file mode 100644 index 0000000000000..13566b841c832 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.fieldvisitor; + +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.StoredFieldVisitor; +import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.Uid; + +public final class IdOnlyFieldVisitor extends StoredFieldVisitor { + private String id = null; + private boolean visited = false; + + @Override + public Status needsField(FieldInfo fieldInfo) { + if (visited) { + return Status.STOP; + } + if (IdFieldMapper.NAME.equals(fieldInfo.name)) { + visited = true; + return Status.YES; + } else { + return Status.NO; + } + } + + @Override + public void binaryField(FieldInfo fieldInfo, byte[] value) { + assert IdFieldMapper.NAME.equals(fieldInfo.name) : fieldInfo; + if (IdFieldMapper.NAME.equals(fieldInfo.name)) { + id = Uid.decodeId(value); + } + } + + public String getId() { + return id; + } + + public void reset() { + id = null; + visited = false; + } +} diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 02f2039202bc4..9aaf6c704beae 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -223,7 +223,7 @@ public void testVersionMapAfterAutoIDDocument() throws IOException { Engine.Index update = indexForDoc(doc); engine.index(update); assertTrue(engine.isSafeAccessRequired()); - assertEquals(1, engine.getVersionMapSize()); + assertThat(engine.getVersionMap().values(), hasSize(1)); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { assertEquals(0, searcher.reader().numDocs()); } @@ -255,7 +255,7 @@ public void testVersionMapAfterAutoIDDocument() throws IOException { : appendOnlyReplica(doc, false, 1, generateNewSeqNo(engine)); engine.index(operation); assertTrue("safe access should be required", engine.isSafeAccessRequired()); - assertEquals(1, engine.getVersionMapSize()); // now we add this to the map + assertThat(engine.getVersionMap().values(), hasSize(1)); // now we add this to the map engine.refresh("test"); if (randomBoolean()) { // randomly refresh here again engine.refresh("test"); @@ -3869,7 +3869,7 @@ public void run() { } catch (InterruptedException e) { throw new AssertionError(e); } - assertEquals(0, engine.getVersionMapSize()); + assertThat(engine.getVersionMap().values(), empty()); int docOffset; while ((docOffset = offset.incrementAndGet()) < docs.size()) { try { @@ -5168,18 +5168,19 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { engine.delete(replicaDeleteForDoc(UUIDs.randomBase64UUID(), 1, seqno, threadPool.relativeTimeInMillis())); } } - List tombstones = new ArrayList<>(engine.getDeletedTombstones()); + + List tombstones = new ArrayList<>(tombstonesInVersionMap(engine).values()); engine.config().setEnableGcDeletes(true); // Prune tombstones whose seqno < gap_seqno and timestamp < clock-gcInterval. clock.set(randomLongBetween(gcInterval, deleteBatch + gcInterval)); engine.refresh("test"); tombstones.removeIf(v -> v.seqNo < gapSeqNo && v.time < clock.get() - gcInterval); - assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray())); + assertThat(tombstonesInVersionMap(engine).values(), containsInAnyOrder(tombstones.toArray())); // Prune tombstones whose seqno at most the local checkpoint (eg. seqno < gap_seqno). clock.set(randomLongBetween(deleteBatch + gcInterval * 4/3, 100)); // Need a margin for gcInterval/4. engine.refresh("test"); tombstones.removeIf(v -> v.seqNo < gapSeqNo); - assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray())); + assertThat(tombstonesInVersionMap(engine).values(), containsInAnyOrder(tombstones.toArray())); // Fill the seqno gap - should prune all tombstones. clock.set(between(0, 100)); if (randomBoolean()) { @@ -5191,7 +5192,7 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { } clock.set(randomLongBetween(100 + gcInterval * 4/3, Long.MAX_VALUE)); // Need a margin for gcInterval/4. engine.refresh("test"); - assertThat(engine.getDeletedTombstones(), empty()); + assertThat(tombstonesInVersionMap(engine).values(), empty()); } } @@ -5605,9 +5606,10 @@ public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception { } } - public void testRebuildLocalCheckpointTracker() throws Exception { + public void testRebuildLocalCheckpointTrackerAndVersionMap() throws Exception { Settings.Builder settings = Settings.builder() .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000) .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); @@ -5624,32 +5626,54 @@ public void testRebuildLocalCheckpointTracker() throws Exception { for (Engine.Operation op : operations) { flushedOperations.add(op); applyOperation(engine, op); + if (randomBoolean()) { + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + engine.syncTranslog(); + } if (randomInt(100) < 10) { engine.refresh("test"); } if (randomInt(100) < 5) { - engine.flush(); + engine.flush(true, true); + flushedOperations.sort(Comparator.comparing(Engine.Operation::seqNo)); commits.add(new ArrayList<>(flushedOperations)); - globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); } } - globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); - engine.syncTranslog(); docs = getDocIds(engine, true); } - Set seqNosInSafeCommit = null; + List operationsInSafeCommit = null; for (int i = commits.size() - 1; i >= 0; i--) { if (commits.get(i).stream().allMatch(op -> op.seqNo() <= globalCheckpoint.get())) { - seqNosInSafeCommit = commits.get(i).stream().map(Engine.Operation::seqNo).collect(Collectors.toSet()); + operationsInSafeCommit = commits.get(i); break; } } - assertThat(seqNosInSafeCommit, notNullValue()); + assertThat(operationsInSafeCommit, notNullValue()); try (InternalEngine engine = new InternalEngine(config)) { // do not recover from translog + final Map deletesAfterCheckpoint = new HashMap<>(); + for (Engine.Operation op : operationsInSafeCommit) { + if (op instanceof Engine.NoOp == false && op.seqNo() > engine.getLocalCheckpoint()) { + deletesAfterCheckpoint.put(new Term(IdFieldMapper.NAME, Uid.encodeId(op.id())).bytes(), op); + } + } + deletesAfterCheckpoint.values().removeIf(o -> o instanceof Engine.Delete == false); + final Map versionMap = engine.getVersionMap(); + for (BytesRef uid : deletesAfterCheckpoint.keySet()) { + final VersionValue versionValue = versionMap.get(uid); + final Engine.Operation op = deletesAfterCheckpoint.get(uid); + final String msg = versionValue + " vs " + + "op[" + op.operationType() + "id=" + op.id() + " seqno=" + op.seqNo() + " term=" + op.primaryTerm() + "]"; + assertThat(versionValue, instanceOf(DeleteVersionValue.class)); + assertThat(msg, versionValue.seqNo, equalTo(op.seqNo())); + assertThat(msg, versionValue.term, equalTo(op.primaryTerm())); + assertThat(msg, versionValue.version, equalTo(op.version())); + } + assertThat(versionMap.keySet(), equalTo(deletesAfterCheckpoint.keySet())); final LocalCheckpointTracker tracker = engine.getLocalCheckpointTracker(); + final Set seqNosInSafeCommit = operationsInSafeCommit.stream().map(op -> op.seqNo()).collect(Collectors.toSet()); for (Engine.Operation op : operations) { assertThat( - "seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + "checkpoint=" + tracker.getCheckpoint(), + "seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(), tracker.contains(op.seqNo()), equalTo(seqNosInSafeCommit.contains(op.seqNo()))); } engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); @@ -5918,4 +5942,43 @@ public void testPruneAwayDeletedButRetainedIds() throws Exception { } } } + + public void testRecoverFromLocalTranslog() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + Path translogPath = createTempDir(); + List operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get); + final List docs; + try (InternalEngine engine = createEngine(config)) { + for (Engine.Operation op : operations) { + applyOperation(engine, op); + if (randomBoolean()) { + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + engine.syncTranslog(); + } + if (randomInt(100) < 10) { + engine.refresh("test"); + } + if (randomInt(100) < 5) { + engine.flush(); + } + if (randomInt(100) < 5) { + engine.forceMerge(randomBoolean(), 1, false, false, false); + } + } + docs = getDocIds(engine, true); + } + try (InternalEngine engine = new InternalEngine(config)) { + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + assertThat(getDocIds(engine, randomBoolean()), equalTo(docs)); + } + } + } + + private Map tombstonesInVersionMap(InternalEngine engine) { + return engine.getVersionMap().entrySet().stream() + .filter(e -> e.getValue() instanceof DeleteVersionValue) + .collect(Collectors.toMap(e -> e.getKey(), e -> (DeleteVersionValue) e.getValue())); + } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index d499cf6e83f90..79d2f004cc95a 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; @@ -725,6 +726,9 @@ public void testAddNewReplicas() throws Exception { if (randomInt(100) < 10) { shards.getPrimary().flush(new FlushRequest()); } + if (randomInt(100) < 5) { + shards.getPrimary().forceMerge(new ForceMergeRequest().flush(randomBoolean()).maxNumSegments(1)); + } } catch (Exception ex) { throw new AssertionError(ex); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index f7042448e7576..02fc3ae9b45af 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -37,6 +37,7 @@ import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ReferenceManager; @@ -1111,10 +1112,8 @@ public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exceptio List commits = DirectoryReader.listCommits(engine.store.directory()); for (IndexCommit commit : commits) { try (DirectoryReader reader = DirectoryReader.open(commit)) { - AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - Lucene.scanSeqNosInReader(reader, 0, Long.MAX_VALUE, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get()))); assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), - greaterThanOrEqualTo(maxSeqNoFromDocs.get())); + greaterThanOrEqualTo(maxSeqNosInReader(reader))); } } } @@ -1177,4 +1176,15 @@ public long getAsLong() { return get(); } } + + static long maxSeqNosInReader(DirectoryReader reader) throws IOException { + long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + for (LeafReaderContext leaf : reader.leaves()) { + final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + while (seqNoDocValues.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNoDocValues.longValue()); + } + } + return maxSeqNo; + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java index 22b22c8779ce6..f951c51844bdc 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java @@ -214,6 +214,9 @@ public void testAddNewReplicasOnFollower() throws Exception { if (rarely()) { followerClient().admin().indices().prepareFlush("follower-index").get(); } + if (rarely()) { + followerClient().admin().indices().prepareForceMerge("follower-index").setMaxNumSegments(1).get(); + } if (rarely()) { followerClient().admin().indices().prepareRefresh("follower-index").get(); }