From 42c8233c840401f703ba125814dc581cd2075787 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 12 Jun 2019 16:14:12 -0400 Subject: [PATCH 1/8] Rebuild version map when opening internal engine --- .../elasticsearch/common/lucene/Lucene.java | 38 ------ .../index/engine/EngineConfig.java | 10 +- .../index/engine/InternalEngine.java | 128 +++++++++++++----- .../elasticsearch/index/shard/IndexShard.java | 2 +- .../index/engine/InternalEngineTests.java | 97 ++++++++++--- .../RecoveryDuringReplicationTests.java | 4 + .../index/shard/RefreshListenersTests.java | 3 +- .../index/engine/EngineTestCase.java | 54 +++++--- .../xpack/ccr/FollowerFailOverIT.java | 3 + .../index/engine/FollowingEngineTests.java | 4 +- 10 files changed, 232 insertions(+), 111 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 88d7c57f5e424..4b0ec9ea08773 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -27,7 +27,6 @@ import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.document.LatLonDocValuesField; -import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.CorruptIndexException; @@ -95,7 +94,6 @@ import org.elasticsearch.index.analysis.AnalyzerScope; import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.fielddata.IndexFieldData; -import org.elasticsearch.index.mapper.SeqNoFieldMapper; import java.io.IOException; import java.text.ParseException; @@ -105,7 +103,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.function.LongConsumer; public class Lucene { public static final String LATEST_DOC_VALUES_FORMAT = "Lucene70"; @@ -1050,39 +1047,4 @@ public CacheHelper getReaderCacheHelper() { } }; } - - /** - * Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive) - * in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found. - * - * @param directoryReader the directory reader to scan - * @param fromSeqNo the lower bound of a range of seq_no to scan (inclusive) - * @param toSeqNo the upper bound of a range of seq_no to scan (inclusive) - * @param onNewSeqNo the callback to be called whenever a new valid sequence number is found - */ - public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, long toSeqNo, - LongConsumer onNewSeqNo) throws IOException { - final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader); - final IndexSearcher searcher = new IndexSearcher(reader); - searcher.setQueryCache(null); - final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo); - final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); - for (LeafReaderContext leaf : reader.leaves()) { - final Scorer scorer = weight.scorer(leaf); - if (scorer == null) { - continue; - } - final DocIdSetIterator docIdSetIterator = scorer.iterator(); - final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); - int docId; - while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - if (seqNoDocValues == null || seqNoDocValues.advanceExact(docId) == false) { - throw new IllegalStateException("seq_no doc_values not found for doc_id=" + docId); - } - final long seqNo = seqNoDocValues.longValue(); - assert fromSeqNo <= seqNo && seqNo <= toSeqNo : "from_seq_no=" + fromSeqNo + " seq_no=" + seqNo + " to_seq_no=" + toSeqNo; - onNewSeqNo.accept(seqNo); - } - } - } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 7696d545649d5..979f8697af760 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.ShardId; @@ -81,6 +82,7 @@ public final class EngineConfig { private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; + private final MapperService mapperService; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -132,7 +134,8 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool, CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier, Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, - TombstoneDocSupplier tombstoneDocSupplier) { + TombstoneDocSupplier tombstoneDocSupplier, + MapperService mapperService) { this.shardId = shardId; this.allocationId = allocationId; this.indexSettings = indexSettings; @@ -171,6 +174,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool, this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); this.primaryTermSupplier = primaryTermSupplier; this.tombstoneDocSupplier = tombstoneDocSupplier; + this.mapperService = mapperService; } /** @@ -381,4 +385,8 @@ public interface TombstoneDocSupplier { public TombstoneDocSupplier getTombstoneDocSupplier() { return tombstoneDocSupplier; } + + public MapperService getMapperService() { + return mapperService; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 408f70d70d1ce..8d3edbeae38e0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -23,6 +23,7 @@ import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader; import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader.FSTLoadMode; import org.apache.lucene.document.Field; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; @@ -31,17 +32,24 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; import org.apache.lucene.search.SearcherFactory; import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.Weight; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; @@ -68,12 +76,15 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.fieldvisitor.FieldsVisitor; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.seqno.LocalCheckpointTracker; @@ -94,7 +105,6 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -109,7 +119,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.LongSupplier; -import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; public class InternalEngine extends Engine { @@ -203,6 +213,7 @@ public InternalEngine(EngineConfig engineConfig) { this.softDeletesPolicy = newSoftDeletesPolicy(); this.combinedDeletionPolicy = new CombinedDeletionPolicy(logger, translogDeletionPolicy, softDeletesPolicy, translog::getLastSyncedGlobalCheckpoint); + this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); writer = createWriter(); bootstrapAppendOnlyInfoFromWriter(writer); historyUUID = loadHistoryUUID(writer); @@ -232,11 +243,17 @@ public InternalEngine(EngineConfig engineConfig) { for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) { this.internalSearcherManager.addListener(listener); } - this.localCheckpointTracker = createLocalCheckpointTracker(engineConfig, lastCommittedSegmentInfos, logger, - () -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier); this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint()); this.internalSearcherManager.addListener(lastRefreshedCheckpointListener); maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo())); + if (softDeleteEnabled && localCheckpointTracker.getCheckpoint() < localCheckpointTracker.getMaxSeqNo()) { + try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) { + restoreVersionMapAndCheckpointTracker(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); + } catch (IOException e) { + throw new EngineCreationFailureException(config().getShardId(), + "failed to restore version map and local checkpoint tracker", e); + } + } success = true; } finally { if (success == false) { @@ -250,30 +267,16 @@ public InternalEngine(EngineConfig engineConfig) { logger.trace("created new InternalEngine"); } - private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig engineConfig, SegmentInfos lastCommittedSegmentInfos, - Logger logger, Supplier searcherSupplier, BiFunction localCheckpointTrackerSupplier) { - try { - final SequenceNumbers.CommitInfo seqNoStats = - SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommittedSegmentInfos.userData.entrySet()); - final long maxSeqNo = seqNoStats.maxSeqNo; - final long localCheckpoint = seqNoStats.localCheckpoint; - logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint); - final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint); - // Operations that are optimized using max_seq_no_of_updates optimization must not be processed twice; otherwise, they will - // create duplicates in Lucene. To avoid this we check the LocalCheckpointTracker to see if an operation was already processed. - // Thus, we need to restore the LocalCheckpointTracker bit by bit to ensure the consistency between LocalCheckpointTracker and - // Lucene index. This is not the only solution since we can bootstrap max_seq_no_of_updates with max_seq_no of the commit to - // disable the MSU optimization during recovery. Here we prefer to maintain the consistency of LocalCheckpointTracker. - if (localCheckpoint < maxSeqNo && engineConfig.getIndexSettings().isSoftDeleteEnabled()) { - try (Searcher searcher = searcherSupplier.get()) { - Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, maxSeqNo, - tracker::markSeqNoAsCompleted); - } - } - return tracker; - } catch (IOException ex) { - throw new EngineCreationFailureException(engineConfig.getShardId(), "failed to create local checkpoint tracker", ex); - } + private LocalCheckpointTracker createLocalCheckpointTracker( + BiFunction localCheckpointTrackerSupplier) throws IOException { + final long maxSeqNo; + final long localCheckpoint; + final SequenceNumbers.CommitInfo seqNoStats = + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet()); + maxSeqNo = seqNoStats.maxSeqNo; + localCheckpoint = seqNoStats.localCheckpoint; + logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint); + return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint); } private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException { @@ -1875,8 +1878,9 @@ void clearDeletedTombstones() { } // for testing - final Collection getDeletedTombstones() { - return versionMap.getAllTombstones().values(); + final Map getVersionMap() { + return Stream.concat(versionMap.getAllCurrent().entrySet().stream(), versionMap.getAllTombstones().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } @Override @@ -2507,10 +2511,6 @@ private boolean incrementIndexVersionLookup() { return true; } - int getVersionMapSize() { - return versionMap.getAllCurrent().size(); - } - boolean isSafeAccessRequired() { return versionMap.isSafeAccessRequired(); } @@ -2773,4 +2773,64 @@ private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOExcept final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUUID); store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, engineConfig.getIndexSettings().getIndexVersionCreated()); } + + /** + * Restores the live version map and local checkpoint of this engine using documents (including soft-deleted) + * after the local checkpoint in the safe commit. This step ensures the live version map and checkpoint tracker + * in sync with Lucene commit. + */ + private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException { + final IndexSearcher searcher = new IndexSearcher(directoryReader); + searcher.setQueryCache(null); + final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getLocalCheckpoint() + 1, Long.MAX_VALUE); + final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); + for (LeafReaderContext leaf : directoryReader.leaves()) { + final LeafReader reader = leaf.reader(); + final Scorer scorer = weight.scorer(leaf); + if (scorer == null) { + continue; + } + final DocIdSetIterator docIdSetIterator = scorer.iterator(); + final NumericDocValues seqNoDV = Objects.requireNonNull(reader.getNumericDocValues(SeqNoFieldMapper.NAME)); + final NumericDocValues primaryTermDV = Objects.requireNonNull(reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME)); + final NumericDocValues tombstoneDV = reader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); + final NumericDocValues versionDV = reader.getNumericDocValues(VersionFieldMapper.NAME); + int docId; + while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + if (primaryTermDV.advanceExact(docId) == false) { + continue; // skip children docs which do not have primary term + } + final long primaryTerm = primaryTermDV.longValue(); + if (seqNoDV.advanceExact(docId) == false) { + throw new IllegalStateException("seq_no not found for doc_id=" + docId); + } + final long seqNo = seqNoDV.longValue(); + localCheckpointTracker.markSeqNoAsCompleted(seqNo); + final boolean isTombstone = tombstoneDV != null && tombstoneDV.advanceExact(docId); + final FieldsVisitor fields = new FieldsVisitor(false, SourceFieldMapper.NAME); + reader.document(docId, fields); + fields.postProcess(engineConfig.getMapperService()); + if (fields.uid() == null) { + assert isTombstone; // a noop + continue; + } + final BytesRef uid = new Term(IdFieldMapper.NAME, Uid.encodeId(fields.uid().id())).bytes(); + if (versionDV == null || versionDV.advanceExact(docId) == false) { + throw new IllegalStateException("version not found for doc_id=" + docId); + } + final long version = versionDV.longValue(); + try (Releasable ignored = versionMap.acquireLock(uid)) { + final VersionValue curr = versionMap.getUnderLock(uid); + if (curr == null || curr.term < primaryTerm || (curr.term == primaryTerm && curr.seqNo <= seqNo)) { + if (isTombstone) { + versionMap.putDeleteUnderLock(uid, + new DeleteVersionValue(version, seqNo, primaryTerm, engineConfig.getThreadPool().relativeTimeInMillis())); + } else { + versionMap.putIndexUnderLock(uid, new IndexVersionValue(null, version, seqNo, primaryTerm)); + } + } + } + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index fdd95614756b7..03940b17a6f25 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2479,7 +2479,7 @@ private EngineConfig newEngineConfig() { Collections.singletonList(refreshListeners), Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), indexSort, circuitBreakerService, replicationTracker, replicationTracker::getRetentionLeases, - () -> getOperationPrimaryTerm(), tombstoneDocSupplier()); + () -> getOperationPrimaryTerm(), tombstoneDocSupplier(), mapperService); } /** diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f4e1ecd2514b3..4208b3c8d322c 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -152,6 +152,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -223,7 +224,7 @@ public void testVersionMapAfterAutoIDDocument() throws IOException { Engine.Index update = indexForDoc(doc); engine.index(update); assertTrue(engine.isSafeAccessRequired()); - assertEquals(1, engine.getVersionMapSize()); + assertThat(engine.getVersionMap().values(), hasSize(1)); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { assertEquals(0, searcher.reader().numDocs()); } @@ -255,7 +256,7 @@ public void testVersionMapAfterAutoIDDocument() throws IOException { : appendOnlyReplica(doc, false, 1, generateNewSeqNo(engine)); engine.index(operation); assertTrue("safe access should be required", engine.isSafeAccessRequired()); - assertEquals(1, engine.getVersionMapSize()); // now we add this to the map + assertThat(engine.getVersionMap().values(), hasSize(1)); // now we add this to the map engine.refresh("test"); if (randomBoolean()) { // randomly refresh here again engine.refresh("test"); @@ -3108,7 +3109,8 @@ public void testRecoverFromForeignTranslog() throws IOException { () -> UNASSIGNED_SEQ_NO, () -> RetentionLeases.EMPTY, primaryTerm::get, - tombstoneDocSupplier()); + tombstoneDocSupplier(), + createMapperService("test")); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); engine = createEngine(store, primaryTranslogDir); // and recover again! @@ -3807,7 +3809,7 @@ public void run() { } catch (InterruptedException e) { throw new AssertionError(e); } - assertEquals(0, engine.getVersionMapSize()); + assertThat(engine.getVersionMap().values(), empty()); int docOffset; while ((docOffset = offset.incrementAndGet()) < docs.size()) { try { @@ -5106,18 +5108,22 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { engine.delete(replicaDeleteForDoc(UUIDs.randomBase64UUID(), 1, seqno, threadPool.relativeTimeInMillis())); } } - List tombstones = new ArrayList<>(engine.getDeletedTombstones()); + Supplier> tombstonesInVersionMap = () -> engine.getVersionMap().entrySet().stream() + .filter(e -> e.getValue() instanceof DeleteVersionValue) + .map(e -> (DeleteVersionValue) (e.getValue())).collect(Collectors.toList()); + + List tombstones = new ArrayList<>(tombstonesInVersionMap.get()); engine.config().setEnableGcDeletes(true); // Prune tombstones whose seqno < gap_seqno and timestamp < clock-gcInterval. clock.set(randomLongBetween(gcInterval, deleteBatch + gcInterval)); engine.refresh("test"); tombstones.removeIf(v -> v.seqNo < gapSeqNo && v.time < clock.get() - gcInterval); - assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray())); + assertThat(tombstonesInVersionMap.get(), containsInAnyOrder(tombstones.toArray())); // Prune tombstones whose seqno at most the local checkpoint (eg. seqno < gap_seqno). clock.set(randomLongBetween(deleteBatch + gcInterval * 4/3, 100)); // Need a margin for gcInterval/4. engine.refresh("test"); tombstones.removeIf(v -> v.seqNo < gapSeqNo); - assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray())); + assertThat(tombstonesInVersionMap.get(), containsInAnyOrder(tombstones.toArray())); // Fill the seqno gap - should prune all tombstones. clock.set(between(0, 100)); if (randomBoolean()) { @@ -5129,7 +5135,7 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { } clock.set(randomLongBetween(100 + gcInterval * 4/3, Long.MAX_VALUE)); // Need a margin for gcInterval/4. engine.refresh("test"); - assertThat(engine.getDeletedTombstones(), empty()); + assertThat(tombstonesInVersionMap.get(), empty()); } } @@ -5543,9 +5549,10 @@ public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception { } } - public void testRebuildLocalCheckpointTracker() throws Exception { + public void testRebuildLocalCheckpointTrackerAndVersionMap() throws Exception { Settings.Builder settings = Settings.builder() .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000) .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); @@ -5562,32 +5569,57 @@ public void testRebuildLocalCheckpointTracker() throws Exception { for (Engine.Operation op : operations) { flushedOperations.add(op); applyOperation(engine, op); + if (randomBoolean()) { + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + engine.syncTranslog(); + } if (randomInt(100) < 10) { engine.refresh("test"); } if (randomInt(100) < 5) { - engine.flush(); + engine.flush(true, true); commits.add(new ArrayList<>(flushedOperations)); globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); } } - globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); - engine.syncTranslog(); docs = getDocIds(engine, true); } - Set seqNosInSafeCommit = null; + List operationsInSafeCommit = null; for (int i = commits.size() - 1; i >= 0; i--) { if (commits.get(i).stream().allMatch(op -> op.seqNo() <= globalCheckpoint.get())) { - seqNosInSafeCommit = commits.get(i).stream().map(Engine.Operation::seqNo).collect(Collectors.toSet()); + operationsInSafeCommit = commits.get(i); break; } } - assertThat(seqNosInSafeCommit, notNullValue()); + assertThat(operationsInSafeCommit, notNullValue()); + operationsInSafeCommit.sort(Comparator.comparing(Engine.Operation::seqNo)); try (InternalEngine engine = new InternalEngine(config)) { // do not recover from translog + final Map operationsAfterCheckpoint = new HashMap<>(); + for (Engine.Operation op : operationsInSafeCommit) { + if (op instanceof Engine.NoOp == false && op.seqNo() > engine.getLocalCheckpoint()) { + operationsAfterCheckpoint.put(new Term(IdFieldMapper.NAME, Uid.encodeId(op.id())).bytes(), op); + } + } + final Map versionMap = engine.getVersionMap(); + assertThat(versionMap.keySet(), equalTo(operationsAfterCheckpoint.keySet())); + for (BytesRef uid : operationsAfterCheckpoint.keySet()) { + final VersionValue versionValue = versionMap.get(uid); + final Engine.Operation op = operationsAfterCheckpoint.get(uid); + final String msg = versionValue + " vs " + op.operationType() + " seqno=" + op.seqNo() + " term=" + op.primaryTerm(); + if (op instanceof Engine.Delete) { + assertThat(msg, versionValue, instanceOf(DeleteVersionValue.class)); + } else { + assertThat(msg, versionValue, instanceOf(IndexVersionValue.class)); + } + assertThat(msg, versionValue.seqNo, equalTo(op.seqNo())); + assertThat(msg, versionValue.term, equalTo(op.primaryTerm())); + assertThat(msg, versionValue.version, equalTo(op.version())); + } final LocalCheckpointTracker tracker = engine.getLocalCheckpointTracker(); + final Set seqNosInSafeCommit = operationsInSafeCommit.stream().map(op -> op.seqNo()).collect(Collectors.toSet()); for (Engine.Operation op : operations) { assertThat( - "seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + "checkpoint=" + tracker.getCheckpoint(), + "seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(), tracker.contains(op.seqNo()), equalTo(seqNosInSafeCommit.contains(op.seqNo()))); } engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); @@ -5856,4 +5888,37 @@ public void testPruneAwayDeletedButRetainedIds() throws Exception { } } } + + public void testRecoverFromLocalTranslog() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + Path translogPath = createTempDir(); + List operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get); + final List docs; + try (InternalEngine engine = createEngine(config)) { + for (Engine.Operation op : operations) { + applyOperation(engine, op); + if (randomBoolean()) { + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + engine.syncTranslog(); + } + if (randomInt(100) < 10) { + engine.refresh("test"); + } + if (randomInt(100) < 5) { + engine.flush(); + } + if (randomInt(100) < 5) { + engine.forceMerge(randomBoolean(), 1, false, false, false); + } + } + docs = getDocIds(engine, true); + } + try (InternalEngine engine = new InternalEngine(config)) { + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + assertThat(getDocIds(engine, randomBoolean()), equalTo(docs)); + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index d499cf6e83f90..79d2f004cc95a 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; @@ -725,6 +726,9 @@ public void testAddNewReplicas() throws Exception { if (randomInt(100) < 10) { shards.getPrimary().flush(new FlushRequest()); } + if (randomInt(100) < 5) { + shards.getPrimary().forceMerge(new ForceMergeRequest().flush(randomBoolean()).maxNumSegments(1)); + } } catch (Exception ex) { throw new AssertionError(ex); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index e264d33ffed61..a8cbed9fa8f5c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -150,7 +150,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { () -> SequenceNumbers.NO_OPS_PERFORMED, () -> RetentionLeases.EMPTY, () -> primaryTerm, - EngineTestCase.tombstoneDocSupplier()); + EngineTestCase.tombstoneDocSupplier(), + EngineTestCase.createMapperService("test")); engine = new InternalEngine(config); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index f7042448e7576..fd5204d534c8c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -37,6 +37,7 @@ import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ReferenceManager; @@ -104,6 +105,7 @@ import org.junit.Before; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.charset.Charset; import java.nio.file.Path; import java.util.ArrayList; @@ -232,7 +234,7 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), globalCheckpointSupplier, config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), tombstoneDocSupplier()); + config.getPrimaryTermSupplier(), tombstoneDocSupplier(), config.getMapperService()); } public EngineConfig copy(EngineConfig config, Analyzer analyzer) { @@ -242,7 +244,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier(), config.getMapperService()); } public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { @@ -252,7 +254,7 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier(), config.getMapperService()); } @Override @@ -697,7 +699,8 @@ public EngineConfig config( globalCheckpointSupplier, retentionLeasesSupplier, primaryTerm, - tombstoneDocSupplier()); + tombstoneDocSupplier(), + createMapperService("test")); } protected EngineConfig config(EngineConfig config, Store store, Path translogPath, @@ -712,7 +715,7 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), tombstoneDocSupplier); + config.getPrimaryTermSupplier(), tombstoneDocSupplier, createMapperService("test")); } protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) { @@ -1111,25 +1114,27 @@ public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exceptio List commits = DirectoryReader.listCommits(engine.store.directory()); for (IndexCommit commit : commits) { try (DirectoryReader reader = DirectoryReader.open(commit)) { - AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - Lucene.scanSeqNosInReader(reader, 0, Long.MAX_VALUE, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get()))); assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), - greaterThanOrEqualTo(maxSeqNoFromDocs.get())); + greaterThanOrEqualTo(maxSeqNosInReader(reader))); } } } - public static MapperService createMapperService(String type) throws IOException { - IndexMetaData indexMetaData = IndexMetaData.builder("test") - .settings(Settings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)) - .putMapping(type, "{\"properties\": {}}") - .build(); - MapperService mapperService = MapperTestUtils.newMapperService(new NamedXContentRegistry(ClusterModule.getNamedXWriteables()), - createTempDir(), Settings.EMPTY, "test"); - mapperService.merge(indexMetaData, MapperService.MergeReason.MAPPING_UPDATE); - return mapperService; + public static MapperService createMapperService(String type) { + try { + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)) + .putMapping(type, "{\"properties\": {}}") + .build(); + MapperService mapperService = MapperTestUtils.newMapperService(new NamedXContentRegistry(ClusterModule.getNamedXWriteables()), + createTempDir(), Settings.EMPTY, "test"); + mapperService.merge(indexMetaData, MapperService.MergeReason.MAPPING_UPDATE); + return mapperService; + } catch (IOException e) { + throw new UncheckedIOException(e); + } } /** @@ -1177,4 +1182,15 @@ public long getAsLong() { return get(); } } + + static long maxSeqNosInReader(DirectoryReader reader) throws IOException { + long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + for (LeafReaderContext leaf : reader.leaves()) { + final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + while (seqNoDocValues.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNoDocValues.longValue()); + } + } + return maxSeqNo; + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java index 22b22c8779ce6..f951c51844bdc 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java @@ -214,6 +214,9 @@ public void testAddNewReplicasOnFollower() throws Exception { if (rarely()) { followerClient().admin().indices().prepareFlush("follower-index").get(); } + if (rarely()) { + followerClient().admin().indices().prepareForceMerge("follower-index").setMaxNumSegments(1).get(); + } if (rarely()) { followerClient().admin().indices().prepareRefresh("follower-index").get(); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 4a56d6370eb91..a14354b6907d3 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static org.elasticsearch.index.engine.EngineTestCase.createMapperService; import static org.elasticsearch.index.engine.EngineTestCase.getDocIds; import static org.elasticsearch.index.engine.EngineTestCase.getTranslog; import static org.hamcrest.Matchers.containsString; @@ -274,7 +275,8 @@ public void onFailedEngine(String reason, Exception e) { globalCheckpoint::longValue, () -> RetentionLeases.EMPTY, () -> primaryTerm.get(), - EngineTestCase.tombstoneDocSupplier()); + EngineTestCase.tombstoneDocSupplier(), + createMapperService("test")); } private static Store createStore( From 711b00c7a0d496c226f3bf3e26e67c1ed28a9390 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 13 Jun 2019 17:07:33 -0400 Subject: [PATCH 2/8] =?UTF-8?q?simon=E2=80=99s=20feedback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../index/engine/EngineConfig.java | 10 +--- .../index/engine/InternalEngine.java | 11 ++-- .../fieldvisitor/IdOnlyFieldVisitor.java | 55 +++++++++++++++++++ .../elasticsearch/index/shard/IndexShard.java | 2 +- .../index/engine/InternalEngineTests.java | 3 +- .../index/shard/RefreshListenersTests.java | 3 +- .../index/engine/EngineTestCase.java | 38 ++++++------- .../index/engine/FollowingEngineTests.java | 3 +- 8 files changed, 81 insertions(+), 44 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 979f8697af760..7696d545649d5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.ShardId; @@ -82,7 +81,6 @@ public final class EngineConfig { private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; - private final MapperService mapperService; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -134,8 +132,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool, CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier, Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, - TombstoneDocSupplier tombstoneDocSupplier, - MapperService mapperService) { + TombstoneDocSupplier tombstoneDocSupplier) { this.shardId = shardId; this.allocationId = allocationId; this.indexSettings = indexSettings; @@ -174,7 +171,6 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool, this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); this.primaryTermSupplier = primaryTermSupplier; this.tombstoneDocSupplier = tombstoneDocSupplier; - this.mapperService = mapperService; } /** @@ -385,8 +381,4 @@ public interface TombstoneDocSupplier { public TombstoneDocSupplier getTombstoneDocSupplier() { return tombstoneDocSupplier; } - - public MapperService getMapperService() { - return mapperService; - } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8d3edbeae38e0..1f8a4b5d4a155 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -76,7 +76,7 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.fieldvisitor.FieldsVisitor; +import org.elasticsearch.index.fieldvisitor.IdOnlyFieldVisitor; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParseContext; @@ -2807,14 +2807,13 @@ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryRead final long seqNo = seqNoDV.longValue(); localCheckpointTracker.markSeqNoAsCompleted(seqNo); final boolean isTombstone = tombstoneDV != null && tombstoneDV.advanceExact(docId); - final FieldsVisitor fields = new FieldsVisitor(false, SourceFieldMapper.NAME); - reader.document(docId, fields); - fields.postProcess(engineConfig.getMapperService()); - if (fields.uid() == null) { + final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor(); + reader.document(docId, idFieldVisitor); + if (idFieldVisitor.getId() == null) { assert isTombstone; // a noop continue; } - final BytesRef uid = new Term(IdFieldMapper.NAME, Uid.encodeId(fields.uid().id())).bytes(); + final BytesRef uid = new Term(IdFieldMapper.NAME, Uid.encodeId(idFieldVisitor.getId())).bytes(); if (versionDV == null || versionDV.advanceExact(docId) == false) { throw new IllegalStateException("version not found for doc_id=" + docId); } diff --git a/server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java b/server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java new file mode 100644 index 0000000000000..a4e6796f6c335 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.fieldvisitor; + +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.StoredFieldVisitor; +import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.Uid; + +public final class IdOnlyFieldVisitor extends StoredFieldVisitor { + private String id = null; + private boolean visited = false; + + @Override + public Status needsField(FieldInfo fieldInfo) { + if (visited) { + return Status.STOP; + } + if (IdFieldMapper.NAME.equals(fieldInfo.name)) { + visited = true; + return Status.YES; + } else { + return Status.NO; + } + } + + @Override + public void binaryField(FieldInfo fieldInfo, byte[] value) { + if (IdFieldMapper.NAME.equals(fieldInfo.name)) { + id = Uid.decodeId(value); + } + assert IdFieldMapper.NAME.equals(fieldInfo.name) : fieldInfo; + } + + public String getId() { + return id; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 03940b17a6f25..fdd95614756b7 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2479,7 +2479,7 @@ private EngineConfig newEngineConfig() { Collections.singletonList(refreshListeners), Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), indexSort, circuitBreakerService, replicationTracker, replicationTracker::getRetentionLeases, - () -> getOperationPrimaryTerm(), tombstoneDocSupplier(), mapperService); + () -> getOperationPrimaryTerm(), tombstoneDocSupplier()); } /** diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 4208b3c8d322c..632180bdaea8a 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3109,8 +3109,7 @@ public void testRecoverFromForeignTranslog() throws IOException { () -> UNASSIGNED_SEQ_NO, () -> RetentionLeases.EMPTY, primaryTerm::get, - tombstoneDocSupplier(), - createMapperService("test")); + tombstoneDocSupplier()); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); engine = createEngine(store, primaryTranslogDir); // and recover again! diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index a8cbed9fa8f5c..e264d33ffed61 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -150,8 +150,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { () -> SequenceNumbers.NO_OPS_PERFORMED, () -> RetentionLeases.EMPTY, () -> primaryTerm, - EngineTestCase.tombstoneDocSupplier(), - EngineTestCase.createMapperService("test")); + EngineTestCase.tombstoneDocSupplier()); engine = new InternalEngine(config); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index fd5204d534c8c..02fc3ae9b45af 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -105,7 +105,6 @@ import org.junit.Before; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.charset.Charset; import java.nio.file.Path; import java.util.ArrayList; @@ -234,7 +233,7 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), globalCheckpointSupplier, config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), tombstoneDocSupplier(), config.getMapperService()); + config.getPrimaryTermSupplier(), tombstoneDocSupplier()); } public EngineConfig copy(EngineConfig config, Analyzer analyzer) { @@ -244,7 +243,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier(), config.getMapperService()); + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); } public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { @@ -254,7 +253,7 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier(), config.getMapperService()); + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); } @Override @@ -699,8 +698,7 @@ public EngineConfig config( globalCheckpointSupplier, retentionLeasesSupplier, primaryTerm, - tombstoneDocSupplier(), - createMapperService("test")); + tombstoneDocSupplier()); } protected EngineConfig config(EngineConfig config, Store store, Path translogPath, @@ -715,7 +713,7 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), tombstoneDocSupplier, createMapperService("test")); + config.getPrimaryTermSupplier(), tombstoneDocSupplier); } protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) { @@ -1120,21 +1118,17 @@ public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exceptio } } - public static MapperService createMapperService(String type) { - try { - IndexMetaData indexMetaData = IndexMetaData.builder("test") - .settings(Settings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)) - .putMapping(type, "{\"properties\": {}}") - .build(); - MapperService mapperService = MapperTestUtils.newMapperService(new NamedXContentRegistry(ClusterModule.getNamedXWriteables()), - createTempDir(), Settings.EMPTY, "test"); - mapperService.merge(indexMetaData, MapperService.MergeReason.MAPPING_UPDATE); - return mapperService; - } catch (IOException e) { - throw new UncheckedIOException(e); - } + public static MapperService createMapperService(String type) throws IOException { + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)) + .putMapping(type, "{\"properties\": {}}") + .build(); + MapperService mapperService = MapperTestUtils.newMapperService(new NamedXContentRegistry(ClusterModule.getNamedXWriteables()), + createTempDir(), Settings.EMPTY, "test"); + mapperService.merge(indexMetaData, MapperService.MergeReason.MAPPING_UPDATE); + return mapperService; } /** diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index a14354b6907d3..620f52849a093 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -275,8 +275,7 @@ public void onFailedEngine(String reason, Exception e) { globalCheckpoint::longValue, () -> RetentionLeases.EMPTY, () -> primaryTerm.get(), - EngineTestCase.tombstoneDocSupplier(), - createMapperService("test")); + EngineTestCase.tombstoneDocSupplier()); } private static Store createStore( From 14b6ebfad59f45624553257fd4aafef8a2cfec25 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 13 Jun 2019 17:25:46 -0400 Subject: [PATCH 3/8] =?UTF-8?q?yannick=E2=80=99s=20feedback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../index/engine/CombinedDocValues.java | 90 +++++++++++++++++++ .../index/engine/InternalEngine.java | 67 +++++++------- .../index/engine/LuceneChangesSnapshot.java | 62 ------------- .../index/engine/InternalEngineTests.java | 41 +++++---- .../index/engine/FollowingEngineTests.java | 1 - 5 files changed, 141 insertions(+), 120 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java new file mode 100644 index 0000000000000..009dc9799d52a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.NumericDocValues; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.VersionFieldMapper; + +import java.io.IOException; +import java.util.Objects; + +final class CombinedDocValues { + private final NumericDocValues versionDV; + private final NumericDocValues seqNoDV; + private final NumericDocValues primaryTermDV; + private final NumericDocValues tombstoneDV; + private final NumericDocValues recoverySource; + + CombinedDocValues(LeafReader leafReader) throws IOException { + this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing"); + this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing"); + this.primaryTermDV = Objects.requireNonNull( + leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing"); + this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); + this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME); + } + + long docVersion(int segmentDocId) throws IOException { + assert versionDV.docID() < segmentDocId; + if (versionDV.advanceExact(segmentDocId) == false) { + throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found"); + } + return versionDV.longValue(); + } + + long docSeqNo(int segmentDocId) throws IOException { + assert seqNoDV.docID() < segmentDocId; + if (seqNoDV.advanceExact(segmentDocId) == false) { + throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"); + } + return seqNoDV.longValue(); + } + + long docPrimaryTerm(int segmentDocId) throws IOException { + if (primaryTermDV == null) { + return -1L; + } + assert primaryTermDV.docID() < segmentDocId; + // Use -1 for docs which don't have primary term. The caller considers those docs as nested docs. + if (primaryTermDV.advanceExact(segmentDocId) == false) { + return -1; + } + return primaryTermDV.longValue(); + } + + boolean isTombstone(int segmentDocId) throws IOException { + if (tombstoneDV == null) { + return false; + } + assert tombstoneDV.docID() < segmentDocId; + return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0; + } + + boolean hasRecoverySource(int segmentDocId) throws IOException { + if (recoverySource == null) { + return false; + } + assert recoverySource.docID() < segmentDocId; + return recoverySource.advanceExact(segmentDocId); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1f8a4b5d4a155..f9da22042c43c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -35,7 +35,6 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; @@ -84,7 +83,6 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.seqno.LocalCheckpointTracker; @@ -679,21 +677,26 @@ enum OpVsLuceneDocStatus { LUCENE_DOC_NOT_FOUND } + private OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long seqNo, long primaryTerm, VersionValue versionValue) { + Objects.requireNonNull(versionValue); + if (seqNo > versionValue.seqNo) { + return OpVsLuceneDocStatus.OP_NEWER; + } else if (seqNo == versionValue.seqNo) { + assert versionValue.term == primaryTerm : "primary term not matched; id=" + id + " seq_no=" + seqNo + + " op_term=" + primaryTerm + " existing_term=" + versionValue.term; + return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } else { + return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; + } + } + private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; final OpVsLuceneDocStatus status; VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); if (versionValue != null) { - if (op.seqNo() > versionValue.seqNo) { - status = OpVsLuceneDocStatus.OP_NEWER; - } else if (op.seqNo() == versionValue.seqNo) { - assert versionValue.term == op.primaryTerm() : "primary term not matched; id=" + op.id() + " seq_no=" + op.seqNo() - + " op_term=" + op.primaryTerm() + " existing_term=" + versionValue.term; - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; - } else { - status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; - } + status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue); } else { // load from index assert incrementIndexVersionLookup(); @@ -2785,51 +2788,43 @@ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryRead final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getLocalCheckpoint() + 1, Long.MAX_VALUE); final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); for (LeafReaderContext leaf : directoryReader.leaves()) { - final LeafReader reader = leaf.reader(); final Scorer scorer = weight.scorer(leaf); if (scorer == null) { continue; } - final DocIdSetIterator docIdSetIterator = scorer.iterator(); - final NumericDocValues seqNoDV = Objects.requireNonNull(reader.getNumericDocValues(SeqNoFieldMapper.NAME)); - final NumericDocValues primaryTermDV = Objects.requireNonNull(reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME)); - final NumericDocValues tombstoneDV = reader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); - final NumericDocValues versionDV = reader.getNumericDocValues(VersionFieldMapper.NAME); + final CombinedDocValues dv = new CombinedDocValues(leaf.reader()); + final DocIdSetIterator iterator = scorer.iterator(); int docId; - while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - if (primaryTermDV.advanceExact(docId) == false) { + while ((docId = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + final long primaryTerm = dv.docPrimaryTerm(docId); + if (primaryTerm == -1L) { continue; // skip children docs which do not have primary term } - final long primaryTerm = primaryTermDV.longValue(); - if (seqNoDV.advanceExact(docId) == false) { - throw new IllegalStateException("seq_no not found for doc_id=" + docId); - } - final long seqNo = seqNoDV.longValue(); + final long seqNo = dv.docSeqNo(docId); localCheckpointTracker.markSeqNoAsCompleted(seqNo); - final boolean isTombstone = tombstoneDV != null && tombstoneDV.advanceExact(docId); final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor(); - reader.document(docId, idFieldVisitor); + leaf.reader().document(docId, idFieldVisitor); if (idFieldVisitor.getId() == null) { - assert isTombstone; // a noop + assert dv.isTombstone(docId); continue; } final BytesRef uid = new Term(IdFieldMapper.NAME, Uid.encodeId(idFieldVisitor.getId())).bytes(); - if (versionDV == null || versionDV.advanceExact(docId) == false) { - throw new IllegalStateException("version not found for doc_id=" + docId); - } - final long version = versionDV.longValue(); try (Releasable ignored = versionMap.acquireLock(uid)) { final VersionValue curr = versionMap.getUnderLock(uid); - if (curr == null || curr.term < primaryTerm || (curr.term == primaryTerm && curr.seqNo <= seqNo)) { - if (isTombstone) { - versionMap.putDeleteUnderLock(uid, - new DeleteVersionValue(version, seqNo, primaryTerm, engineConfig.getThreadPool().relativeTimeInMillis())); + if (curr == null || + compareOpToVersionMapOnSeqNo(idFieldVisitor.getId(), seqNo, primaryTerm, curr) == OpVsLuceneDocStatus.OP_NEWER) { + if (dv.isTombstone(docId)) { + // use the current timestamp to honor GC deletes in case this engine becomes primary. + final long startTime = config().getThreadPool().relativeTimeInMillis(); + versionMap.putDeleteUnderLock(uid, new DeleteVersionValue(dv.docVersion(docId), seqNo, primaryTerm, startTime)); } else { - versionMap.putIndexUnderLock(uid, new IndexVersionValue(null, version, seqNo, primaryTerm)); + versionMap.putIndexUnderLock(uid, new IndexVersionValue(null, dv.docVersion(docId), seqNo, primaryTerm)); } } } } } + // remove live entries in the version map + refresh("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL, true); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index a3e86ab1606df..cb0862ff13171 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -40,14 +40,12 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.translog.Translog; import java.io.Closeable; import java.io.IOException; import java.util.Comparator; import java.util.List; -import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -305,64 +303,4 @@ private static final class ParallelArray { } } - private static final class CombinedDocValues { - private final NumericDocValues versionDV; - private final NumericDocValues seqNoDV; - private final NumericDocValues primaryTermDV; - private final NumericDocValues tombstoneDV; - private final NumericDocValues recoverySource; - - CombinedDocValues(LeafReader leafReader) throws IOException { - this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing"); - this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing"); - this.primaryTermDV = Objects.requireNonNull( - leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing"); - this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); - this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME); - } - - long docVersion(int segmentDocId) throws IOException { - assert versionDV.docID() < segmentDocId; - if (versionDV.advanceExact(segmentDocId) == false) { - throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found"); - } - return versionDV.longValue(); - } - - long docSeqNo(int segmentDocId) throws IOException { - assert seqNoDV.docID() < segmentDocId; - if (seqNoDV.advanceExact(segmentDocId) == false) { - throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found"); - } - return seqNoDV.longValue(); - } - - long docPrimaryTerm(int segmentDocId) throws IOException { - if (primaryTermDV == null) { - return -1L; - } - assert primaryTermDV.docID() < segmentDocId; - // Use -1 for docs which don't have primary term. The caller considers those docs as nested docs. - if (primaryTermDV.advanceExact(segmentDocId) == false) { - return -1; - } - return primaryTermDV.longValue(); - } - - boolean isTombstone(int segmentDocId) throws IOException { - if (tombstoneDV == null) { - return false; - } - assert tombstoneDV.docID() < segmentDocId; - return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0; - } - - boolean hasRecoverySource(int segmentDocId) throws IOException { - if (recoverySource == null) { - return false; - } - assert recoverySource.docID() < segmentDocId; - return recoverySource.advanceExact(segmentDocId); - } - } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 632180bdaea8a..e12356183edc4 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -152,7 +152,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -5107,22 +5106,19 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { engine.delete(replicaDeleteForDoc(UUIDs.randomBase64UUID(), 1, seqno, threadPool.relativeTimeInMillis())); } } - Supplier> tombstonesInVersionMap = () -> engine.getVersionMap().entrySet().stream() - .filter(e -> e.getValue() instanceof DeleteVersionValue) - .map(e -> (DeleteVersionValue) (e.getValue())).collect(Collectors.toList()); - List tombstones = new ArrayList<>(tombstonesInVersionMap.get()); + List tombstones = new ArrayList<>(tombstonesInVersionMap(engine).values()); engine.config().setEnableGcDeletes(true); // Prune tombstones whose seqno < gap_seqno and timestamp < clock-gcInterval. clock.set(randomLongBetween(gcInterval, deleteBatch + gcInterval)); engine.refresh("test"); tombstones.removeIf(v -> v.seqNo < gapSeqNo && v.time < clock.get() - gcInterval); - assertThat(tombstonesInVersionMap.get(), containsInAnyOrder(tombstones.toArray())); + assertThat(tombstonesInVersionMap(engine).values(), containsInAnyOrder(tombstones.toArray())); // Prune tombstones whose seqno at most the local checkpoint (eg. seqno < gap_seqno). clock.set(randomLongBetween(deleteBatch + gcInterval * 4/3, 100)); // Need a margin for gcInterval/4. engine.refresh("test"); tombstones.removeIf(v -> v.seqNo < gapSeqNo); - assertThat(tombstonesInVersionMap.get(), containsInAnyOrder(tombstones.toArray())); + assertThat(tombstonesInVersionMap(engine).values(), containsInAnyOrder(tombstones.toArray())); // Fill the seqno gap - should prune all tombstones. clock.set(between(0, 100)); if (randomBoolean()) { @@ -5134,7 +5130,7 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { } clock.set(randomLongBetween(100 + gcInterval * 4/3, Long.MAX_VALUE)); // Need a margin for gcInterval/4. engine.refresh("test"); - assertThat(tombstonesInVersionMap.get(), empty()); + assertThat(tombstonesInVersionMap(engine).values(), empty()); } } @@ -5577,8 +5573,8 @@ public void testRebuildLocalCheckpointTrackerAndVersionMap() throws Exception { } if (randomInt(100) < 5) { engine.flush(true, true); + flushedOperations.sort(Comparator.comparing(Engine.Operation::seqNo)); commits.add(new ArrayList<>(flushedOperations)); - globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); } } docs = getDocIds(engine, true); @@ -5591,29 +5587,26 @@ public void testRebuildLocalCheckpointTrackerAndVersionMap() throws Exception { } } assertThat(operationsInSafeCommit, notNullValue()); - operationsInSafeCommit.sort(Comparator.comparing(Engine.Operation::seqNo)); try (InternalEngine engine = new InternalEngine(config)) { // do not recover from translog - final Map operationsAfterCheckpoint = new HashMap<>(); + final Map deletesAfterCheckpoint = new HashMap<>(); for (Engine.Operation op : operationsInSafeCommit) { if (op instanceof Engine.NoOp == false && op.seqNo() > engine.getLocalCheckpoint()) { - operationsAfterCheckpoint.put(new Term(IdFieldMapper.NAME, Uid.encodeId(op.id())).bytes(), op); + deletesAfterCheckpoint.put(new Term(IdFieldMapper.NAME, Uid.encodeId(op.id())).bytes(), op); } } + deletesAfterCheckpoint.values().removeIf(o -> o instanceof Engine.Delete == false); final Map versionMap = engine.getVersionMap(); - assertThat(versionMap.keySet(), equalTo(operationsAfterCheckpoint.keySet())); - for (BytesRef uid : operationsAfterCheckpoint.keySet()) { + for (BytesRef uid : deletesAfterCheckpoint.keySet()) { final VersionValue versionValue = versionMap.get(uid); - final Engine.Operation op = operationsAfterCheckpoint.get(uid); - final String msg = versionValue + " vs " + op.operationType() + " seqno=" + op.seqNo() + " term=" + op.primaryTerm(); - if (op instanceof Engine.Delete) { - assertThat(msg, versionValue, instanceOf(DeleteVersionValue.class)); - } else { - assertThat(msg, versionValue, instanceOf(IndexVersionValue.class)); - } + final Engine.Operation op = deletesAfterCheckpoint.get(uid); + final String msg = versionValue + " vs " + + "op[" + op.operationType() + "id=" + op.id() + " seqno=" + op.seqNo() + " term=" + op.primaryTerm() + "]"; + assertThat(versionValue, instanceOf(DeleteVersionValue.class)); assertThat(msg, versionValue.seqNo, equalTo(op.seqNo())); assertThat(msg, versionValue.term, equalTo(op.primaryTerm())); assertThat(msg, versionValue.version, equalTo(op.version())); } + assertThat(versionMap.keySet(), equalTo(deletesAfterCheckpoint.keySet())); final LocalCheckpointTracker tracker = engine.getLocalCheckpointTracker(); final Set seqNosInSafeCommit = operationsInSafeCommit.stream().map(op -> op.seqNo()).collect(Collectors.toSet()); for (Engine.Operation op : operations) { @@ -5920,4 +5913,10 @@ public void testRecoverFromLocalTranslog() throws Exception { } } } + + private Map tombstonesInVersionMap(InternalEngine engine) { + return engine.getVersionMap().entrySet().stream() + .filter(e -> e.getValue() instanceof DeleteVersionValue) + .collect(Collectors.toMap(e -> e.getKey(), e -> (DeleteVersionValue) e.getValue())); + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 620f52849a093..4a56d6370eb91 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -58,7 +58,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import static org.elasticsearch.index.engine.EngineTestCase.createMapperService; import static org.elasticsearch.index.engine.EngineTestCase.getDocIds; import static org.elasticsearch.index.engine.EngineTestCase.getTranslog; import static org.hamcrest.Matchers.containsString; From e67fa1e16b739e28d5fceb7ef67ad8f3a805e339 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 14 Jun 2019 09:12:57 -0400 Subject: [PATCH 4/8] make it static --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f9da22042c43c..7da6218bd8c9c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -677,7 +677,7 @@ enum OpVsLuceneDocStatus { LUCENE_DOC_NOT_FOUND } - private OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long seqNo, long primaryTerm, VersionValue versionValue) { + private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long seqNo, long primaryTerm, VersionValue versionValue) { Objects.requireNonNull(versionValue); if (seqNo > versionValue.seqNo) { return OpVsLuceneDocStatus.OP_NEWER; From f9d622990c335e9b86a1e3f219175e56e778e340 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 14 Jun 2019 09:16:33 -0400 Subject: [PATCH 5/8] use 0L so we can prune quickly --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7da6218bd8c9c..45831087eef10 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2814,8 +2814,9 @@ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryRead if (curr == null || compareOpToVersionMapOnSeqNo(idFieldVisitor.getId(), seqNo, primaryTerm, curr) == OpVsLuceneDocStatus.OP_NEWER) { if (dv.isTombstone(docId)) { - // use the current timestamp to honor GC deletes in case this engine becomes primary. - final long startTime = config().getThreadPool().relativeTimeInMillis(); + // use 0L for the start time so we can prune this delete tombstone quickly + // when the local checkpoint advances (i.e., after a recovery completed). + final long startTime = 0L; versionMap.putDeleteUnderLock(uid, new DeleteVersionValue(dv.docVersion(docId), seqNo, primaryTerm, startTime)); } else { versionMap.putIndexUnderLock(uid, new IndexVersionValue(null, dv.docVersion(docId), seqNo, primaryTerm)); From a7b2bea2966c063b978ce5c75deb0411d2b6e113 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 14 Jun 2019 09:17:58 -0400 Subject: [PATCH 6/8] Reuse field visitor --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 3 ++- .../elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 45831087eef10..47102da617f42 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2793,6 +2793,7 @@ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryRead continue; } final CombinedDocValues dv = new CombinedDocValues(leaf.reader()); + final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor(); final DocIdSetIterator iterator = scorer.iterator(); int docId; while ((docId = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { @@ -2802,7 +2803,7 @@ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryRead } final long seqNo = dv.docSeqNo(docId); localCheckpointTracker.markSeqNoAsCompleted(seqNo); - final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor(); + idFieldVisitor.reset(); leaf.reader().document(docId, idFieldVisitor); if (idFieldVisitor.getId() == null) { assert dv.isTombstone(docId); diff --git a/server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java b/server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java index a4e6796f6c335..06ed01774b0a9 100644 --- a/server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java +++ b/server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java @@ -52,4 +52,9 @@ public void binaryField(FieldInfo fieldInfo, byte[] value) { public String getId() { return id; } + + public void reset() { + id = null; + visited = false; + } } From a24c25a24862dea446ae72f5094d6d0d030dff78 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 17 Jun 2019 12:55:50 -0400 Subject: [PATCH 7/8] wording --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 47102da617f42..cdd6cd86e6afb 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2780,7 +2780,7 @@ private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOExcept /** * Restores the live version map and local checkpoint of this engine using documents (including soft-deleted) * after the local checkpoint in the safe commit. This step ensures the live version map and checkpoint tracker - * in sync with Lucene commit. + * are in sync with the Lucene commit. */ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException { final IndexSearcher searcher = new IndexSearcher(directoryReader); From 1cb64368b1d33b498e5bd48e3c72a04e67de5c58 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 17 Jun 2019 13:36:22 -0400 Subject: [PATCH 8/8] assertion first --- .../elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java b/server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java index 06ed01774b0a9..13566b841c832 100644 --- a/server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java +++ b/server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java @@ -43,10 +43,10 @@ public Status needsField(FieldInfo fieldInfo) { @Override public void binaryField(FieldInfo fieldInfo, byte[] value) { + assert IdFieldMapper.NAME.equals(fieldInfo.name) : fieldInfo; if (IdFieldMapper.NAME.equals(fieldInfo.name)) { id = Uid.decodeId(value); } - assert IdFieldMapper.NAME.equals(fieldInfo.name) : fieldInfo; } public String getId() {