Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use soft-deleted docs to resolve strategy for engine operation #35230

Merged
merged 16 commits into from
Nov 7, 2018
39 changes: 0 additions & 39 deletions server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
Expand All @@ -43,7 +42,6 @@
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
Expand Down Expand Up @@ -84,7 +82,6 @@
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;

import java.io.IOException;
import java.text.ParseException;
Expand All @@ -94,7 +91,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.LongConsumer;

public class Lucene {
public static final String LATEST_DOC_VALUES_FORMAT = "Lucene70";
Expand Down Expand Up @@ -972,39 +968,4 @@ public CacheHelper getReaderCacheHelper() {
public static NumericDocValuesField newSoftDeletesField() {
return new NumericDocValuesField(SOFT_DELETES_FIELD, 1);
}

/**
* Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive)
* in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found.
*
* @param directoryReader the directory reader to scan
* @param fromSeqNo the lower bound of a range of seq_no to scan (inclusive)
* @param toSeqNo the upper bound of a range of seq_no to scan (inclusive)
* @param onNewSeqNo the callback to be called whenever a new valid sequence number is found
*/
public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, long toSeqNo,
LongConsumer onNewSeqNo) throws IOException {
final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader);
final IndexSearcher searcher = new IndexSearcher(reader);
searcher.setQueryCache(null);
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo);
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
for (LeafReaderContext leaf : reader.leaves()) {
final Scorer scorer = weight.scorer(leaf);
if (scorer == null) {
continue;
}
final DocIdSetIterator docIdSetIterator = scorer.iterator();
final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
int docId;
while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (seqNoDocValues == null || seqNoDocValues.advanceExact(docId) == false) {
throw new IllegalStateException("seq_no doc_values not found for doc_id=" + docId);
}
final long seqNo = seqNoDocValues.longValue();
assert fromSeqNo <= seqNo && seqNo <= toSeqNo : "from_seq_no=" + fromSeqNo + " seq_no=" + seqNo + " to_seq_no=" + toSeqNo;
onNewSeqNo.accept(seqNo);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Stream;

public class InternalEngine extends Engine {
Expand Down Expand Up @@ -192,6 +191,7 @@ public InternalEngine(EngineConfig engineConfig) {
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
assert translog.getGeneration() != null;
this.translog = translog;
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled();
this.softDeletesPolicy = newSoftDeletesPolicy();
this.combinedDeletionPolicy =
Expand Down Expand Up @@ -225,8 +225,6 @@ public InternalEngine(EngineConfig engineConfig) {
for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) {
this.internalSearcherManager.addListener(listener);
}
this.localCheckpointTracker = createLocalCheckpointTracker(engineConfig, lastCommittedSegmentInfos, logger,
() -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier);
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
success = true;
Expand All @@ -242,29 +240,16 @@ public InternalEngine(EngineConfig engineConfig) {
logger.trace("created new InternalEngine");
}

private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig engineConfig, SegmentInfos lastCommittedSegmentInfos,
Logger logger, Supplier<Searcher> searcherSupplier, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
try {
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommittedSegmentInfos.userData.entrySet());
final long maxSeqNo = seqNoStats.maxSeqNo;
final long localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
// Operations that are optimized using max_seq_no_of_updates optimization must not be processed twice; otherwise, they will
// create duplicates in Lucene. To avoid this we check the LocalCheckpointTracker to see if an operation was already processed.
// Thus, we need to restore the LocalCheckpointTracker bit by bit to ensure the consistency between LocalCheckpointTracker and
// Lucene index. This is not the only solution since we can bootstrap max_seq_no_of_updates with max_seq_no of the commit to
// disable the MSU optimization during recovery. Here we prefer to maintain the consistency of LocalCheckpointTracker.
if (localCheckpoint < maxSeqNo && engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
try (Searcher searcher = searcherSupplier.get()) {
Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, maxSeqNo, tracker::markSeqNoAsCompleted);
}
}
return tracker;
} catch (IOException ex) {
throw new EngineCreationFailureException(engineConfig.getShardId(), "failed to create local checkpoint tracker", ex);
}
private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet());
maxSeqNo = seqNoStats.maxSeqNo;
localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
}

private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
Expand Down Expand Up @@ -682,8 +667,6 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
} else if (op.seqNo() > docAndSeqNo.seqNo) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else if (op.seqNo() == docAndSeqNo.seqNo) {
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
// load term to tie break
final long existingTerm = VersionsAndSeqNoResolver.loadPrimaryTerm(docAndSeqNo, op.uid().field());
if (op.primaryTerm() > existingTerm) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,9 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand
@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel,
Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps());
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
recoveryRef.target().prepareForTranslogOperations(
request.isFileBasedRecovery(), request.totalTranslogOps(), request.maxSeqNo());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;

Expand All @@ -34,12 +35,15 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
private final ShardId shardId;
private final int totalTranslogOps;
private final boolean fileBasedRecovery;
private final long maxSeqNo;

RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean fileBasedRecovery) {
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps,
boolean fileBasedRecovery, long maxSeqNo) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.totalTranslogOps = totalTranslogOps;
this.fileBasedRecovery = fileBasedRecovery;
this.maxSeqNo = maxSeqNo;
}

RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException {
Expand All @@ -55,6 +59,11 @@ class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
} else {
fileBasedRecovery = true;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
maxSeqNo = in.readZLong();
} else {
maxSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}

public long recoveryId() {
Expand All @@ -76,6 +85,11 @@ public boolean isFileBasedRecovery() {
return fileBasedRecovery;
}

/** Returns the max sequence number of the primary */
public long maxSeqNo() {
return maxSeqNo;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand All @@ -88,5 +102,8 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeBoolean(fileBasedRecovery);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeZLong(maxSeqNo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
try {
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), shard.seqNoStats().getMaxSeqNo());
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
Expand Down Expand Up @@ -427,13 +427,13 @@ public void phase1(final IndexCommit snapshot, final Supplier<Integer> translogO
}
}

void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException {
void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps, final long maxSeqNo) throws IOException {
StopWatch stopWatch = new StopWatch().start();
logger.trace("recovery [phase1]: prepare remote engine for translog");
final long startEngineStart = stopWatch.totalTime().millis();
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps));
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, maxSeqNo));
stopWatch.stop();

response.startTime = stopWatch.totalTime().millis() - startEngineStart;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,14 @@ private void ensureRefCount() {
/*** Implementation of {@link RecoveryTargetHandler } */

@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, long maxSeqNo) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
indexShard().openEngineAndSkipTranslogRecovery();
// We must disable msu optimization for all operations that potentially exist in the index commit because the LocalCheckpointTracker
// does not fully reflect the state of an index commit until the recovery is finished. Since the max_seq_no of the primary
// is at least the max_seq_no of the index commit so we can bootstrap it to the max_seq_no_of_updates of this replica before
// adding this replica to the replication group or replaying translog operations from the primary.
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ public interface RecoveryTargetHandler {

/**
* Prepares the target to receive translog operations, after all file have been copied
* @param fileBasedRecovery whether or not this call is part of an file based recovery
* @param totalTranslogOps total translog operations expected to be sent
* @param fileBasedRecovery whether or not this call is part of an file based recovery
* @param totalTranslogOps total translog operations expected to be sent
* @param maxSeqNo the maximum sequence number on the primary
*/
void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException;
void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, long maxSeqNo) throws IOException;

/**
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe
}

@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, long maxSeqNo) throws IOException {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery),
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery, maxSeqNo),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
Expand Down
Loading