Skip to content

Commit

Permalink
Rebuild version map when opening internal engine
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Jun 13, 2019
1 parent e343ed1 commit 42c8233
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 111 deletions.
38 changes: 0 additions & 38 deletions server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.CorruptIndexException;
Expand Down Expand Up @@ -95,7 +94,6 @@
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;

import java.io.IOException;
import java.text.ParseException;
Expand All @@ -105,7 +103,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.LongConsumer;

public class Lucene {
public static final String LATEST_DOC_VALUES_FORMAT = "Lucene70";
Expand Down Expand Up @@ -1050,39 +1047,4 @@ public CacheHelper getReaderCacheHelper() {
}
};
}

/**
* Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive)
* in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found.
*
* @param directoryReader the directory reader to scan
* @param fromSeqNo the lower bound of a range of seq_no to scan (inclusive)
* @param toSeqNo the upper bound of a range of seq_no to scan (inclusive)
* @param onNewSeqNo the callback to be called whenever a new valid sequence number is found
*/
public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, long toSeqNo,
LongConsumer onNewSeqNo) throws IOException {
final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader);
final IndexSearcher searcher = new IndexSearcher(reader);
searcher.setQueryCache(null);
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo);
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
for (LeafReaderContext leaf : reader.leaves()) {
final Scorer scorer = weight.scorer(leaf);
if (scorer == null) {
continue;
}
final DocIdSetIterator docIdSetIterator = scorer.iterator();
final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
int docId;
while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (seqNoDocValues == null || seqNoDocValues.advanceExact(docId) == false) {
throw new IllegalStateException("seq_no doc_values not found for doc_id=" + docId);
}
final long seqNo = seqNoDocValues.longValue();
assert fromSeqNo <= seqNo && seqNo <= toSeqNo : "from_seq_no=" + fromSeqNo + " seq_no=" + seqNo + " to_seq_no=" + toSeqNo;
onNewSeqNo.accept(seqNo);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -81,6 +82,7 @@ public final class EngineConfig {
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private final MapperService mapperService;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -132,7 +134,8 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier) {
TombstoneDocSupplier tombstoneDocSupplier,
MapperService mapperService) {
this.shardId = shardId;
this.allocationId = allocationId;
this.indexSettings = indexSettings;
Expand Down Expand Up @@ -171,6 +174,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
this.primaryTermSupplier = primaryTermSupplier;
this.tombstoneDocSupplier = tombstoneDocSupplier;
this.mapperService = mapperService;
}

/**
Expand Down Expand Up @@ -381,4 +385,8 @@ public interface TombstoneDocSupplier {
public TombstoneDocSupplier getTombstoneDocSupplier() {
return tombstoneDocSupplier;
}

public MapperService getMapperService() {
return mapperService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader.FSTLoadMode;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
Expand All @@ -31,17 +32,24 @@
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
Expand All @@ -68,12 +76,15 @@
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
Expand All @@ -94,7 +105,6 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -109,7 +119,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class InternalEngine extends Engine {
Expand Down Expand Up @@ -203,6 +213,7 @@ public InternalEngine(EngineConfig engineConfig) {
this.softDeletesPolicy = newSoftDeletesPolicy();
this.combinedDeletionPolicy =
new CombinedDeletionPolicy(logger, translogDeletionPolicy, softDeletesPolicy, translog::getLastSyncedGlobalCheckpoint);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
historyUUID = loadHistoryUUID(writer);
Expand Down Expand Up @@ -232,11 +243,17 @@ public InternalEngine(EngineConfig engineConfig) {
for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) {
this.internalSearcherManager.addListener(listener);
}
this.localCheckpointTracker = createLocalCheckpointTracker(engineConfig, lastCommittedSegmentInfos, logger,
() -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier);
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()));
if (softDeleteEnabled && localCheckpointTracker.getCheckpoint() < localCheckpointTracker.getMaxSeqNo()) {
try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) {
restoreVersionMapAndCheckpointTracker(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
} catch (IOException e) {
throw new EngineCreationFailureException(config().getShardId(),
"failed to restore version map and local checkpoint tracker", e);
}
}
success = true;
} finally {
if (success == false) {
Expand All @@ -250,30 +267,16 @@ public InternalEngine(EngineConfig engineConfig) {
logger.trace("created new InternalEngine");
}

private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig engineConfig, SegmentInfos lastCommittedSegmentInfos,
Logger logger, Supplier<Searcher> searcherSupplier, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
try {
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommittedSegmentInfos.userData.entrySet());
final long maxSeqNo = seqNoStats.maxSeqNo;
final long localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
// Operations that are optimized using max_seq_no_of_updates optimization must not be processed twice; otherwise, they will
// create duplicates in Lucene. To avoid this we check the LocalCheckpointTracker to see if an operation was already processed.
// Thus, we need to restore the LocalCheckpointTracker bit by bit to ensure the consistency between LocalCheckpointTracker and
// Lucene index. This is not the only solution since we can bootstrap max_seq_no_of_updates with max_seq_no of the commit to
// disable the MSU optimization during recovery. Here we prefer to maintain the consistency of LocalCheckpointTracker.
if (localCheckpoint < maxSeqNo && engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
try (Searcher searcher = searcherSupplier.get()) {
Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, maxSeqNo,
tracker::markSeqNoAsCompleted);
}
}
return tracker;
} catch (IOException ex) {
throw new EngineCreationFailureException(engineConfig.getShardId(), "failed to create local checkpoint tracker", ex);
}
private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet());
maxSeqNo = seqNoStats.maxSeqNo;
localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
}

private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
Expand Down Expand Up @@ -1875,8 +1878,9 @@ void clearDeletedTombstones() {
}

// for testing
final Collection<DeleteVersionValue> getDeletedTombstones() {
return versionMap.getAllTombstones().values();
final Map<BytesRef, VersionValue> getVersionMap() {
return Stream.concat(versionMap.getAllCurrent().entrySet().stream(), versionMap.getAllTombstones().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
Expand Down Expand Up @@ -2507,10 +2511,6 @@ private boolean incrementIndexVersionLookup() {
return true;
}

int getVersionMapSize() {
return versionMap.getAllCurrent().size();
}

boolean isSafeAccessRequired() {
return versionMap.isSafeAccessRequired();
}
Expand Down Expand Up @@ -2773,4 +2773,64 @@ private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOExcept
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUUID);
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, engineConfig.getIndexSettings().getIndexVersionCreated());
}

/**
* Restores the live version map and local checkpoint of this engine using documents (including soft-deleted)
* after the local checkpoint in the safe commit. This step ensures the live version map and checkpoint tracker
* in sync with Lucene commit.
*/
private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException {
final IndexSearcher searcher = new IndexSearcher(directoryReader);
searcher.setQueryCache(null);
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getLocalCheckpoint() + 1, Long.MAX_VALUE);
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
for (LeafReaderContext leaf : directoryReader.leaves()) {
final LeafReader reader = leaf.reader();
final Scorer scorer = weight.scorer(leaf);
if (scorer == null) {
continue;
}
final DocIdSetIterator docIdSetIterator = scorer.iterator();
final NumericDocValues seqNoDV = Objects.requireNonNull(reader.getNumericDocValues(SeqNoFieldMapper.NAME));
final NumericDocValues primaryTermDV = Objects.requireNonNull(reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME));
final NumericDocValues tombstoneDV = reader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
final NumericDocValues versionDV = reader.getNumericDocValues(VersionFieldMapper.NAME);
int docId;
while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (primaryTermDV.advanceExact(docId) == false) {
continue; // skip children docs which do not have primary term
}
final long primaryTerm = primaryTermDV.longValue();
if (seqNoDV.advanceExact(docId) == false) {
throw new IllegalStateException("seq_no not found for doc_id=" + docId);
}
final long seqNo = seqNoDV.longValue();
localCheckpointTracker.markSeqNoAsCompleted(seqNo);
final boolean isTombstone = tombstoneDV != null && tombstoneDV.advanceExact(docId);
final FieldsVisitor fields = new FieldsVisitor(false, SourceFieldMapper.NAME);
reader.document(docId, fields);
fields.postProcess(engineConfig.getMapperService());
if (fields.uid() == null) {
assert isTombstone; // a noop
continue;
}
final BytesRef uid = new Term(IdFieldMapper.NAME, Uid.encodeId(fields.uid().id())).bytes();
if (versionDV == null || versionDV.advanceExact(docId) == false) {
throw new IllegalStateException("version not found for doc_id=" + docId);
}
final long version = versionDV.longValue();
try (Releasable ignored = versionMap.acquireLock(uid)) {
final VersionValue curr = versionMap.getUnderLock(uid);
if (curr == null || curr.term < primaryTerm || (curr.term == primaryTerm && curr.seqNo <= seqNo)) {
if (isTombstone) {
versionMap.putDeleteUnderLock(uid,
new DeleteVersionValue(version, seqNo, primaryTerm, engineConfig.getThreadPool().relativeTimeInMillis()));
} else {
versionMap.putIndexUnderLock(uid, new IndexVersionValue(null, version, seqNo, primaryTerm));
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2479,7 +2479,7 @@ private EngineConfig newEngineConfig() {
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, circuitBreakerService, replicationTracker, replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(), tombstoneDocSupplier());
() -> getOperationPrimaryTerm(), tombstoneDocSupplier(), mapperService);
}

/**
Expand Down
Loading

0 comments on commit 42c8233

Please sign in to comment.