Skip to content

Commit

Permalink
Simplify initialization of max_seq_no of updates (#41161)
Browse files Browse the repository at this point in the history
Today we choose to initialize max_seq_no_of_updates on primaries only so
we can deal with a situation where a primary is on an old node (before
6.5) which does not have MUS while replicas on new nodes (6.5+).
However, this strategy is quite complex and can lead to bugs (for
example #40249) since we have to assign a correct value (not too low) to
MSU in all possible situations (before recovering from translog,
restoring history on promotion, and handing off relocation).

Fortunately, we don't have to deal with this BWC in 7.0+ since all nodes
in the cluster should have MSU. This change simplifies the
initialization of MSU by always assigning it a correct value in the
constructor of Engine regardless of whether it's a replica or primary.

Relates #33842
  • Loading branch information
dnhatn authored Apr 30, 2019
1 parent 7a407f5 commit 73bfdc4
Show file tree
Hide file tree
Showing 11 changed files with 40 additions and 112 deletions.
27 changes: 2 additions & 25 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -142,16 +141,6 @@ public abstract class Engine implements Closeable {
*/
protected volatile long lastWriteNanos = System.nanoTime();

/*
* This marker tracks the max seq_no of either update operations or delete operations have been processed in this engine.
* An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index.
* This marker is started uninitialized (-2), and the optimization using seq_no will be disabled if this marker is uninitialized.
* The value of this marker never goes backwards, and is updated/changed differently on primary and replica:
* 1. A primary initializes this marker once using the max_seq_no from its history, then advances when processing an update or delete.
* 2. A replica never advances this marker by itself but only inherits from its primary (via advanceMaxSeqNoOfUpdatesOrDeletes).
*/
private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(UNASSIGNED_SEQ_NO);

protected Engine(EngineConfig engineConfig) {
Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine");

Expand Down Expand Up @@ -1961,25 +1950,13 @@ public interface TranslogRecoveryRunner {
* Moreover, operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates
* in Lucene. To avoid this we check the local checkpoint tracker to see if an operation was already processed.
*
* @see #reinitializeMaxSeqNoOfUpdatesOrDeletes()
* @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)
*/
public final long getMaxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes.get();
}

/**
* A primary shard calls this method to re-initialize the max_seq_no_of_updates marker using the
* max_seq_no from Lucene index and translog before replaying the local translog in its local recovery.
*/
public abstract void reinitializeMaxSeqNoOfUpdatesOrDeletes();
public abstract long getMaxSeqNoOfUpdatesOrDeletes();

/**
* A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method
* to advance this marker to at least the given sequence number.
*/
public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo));
assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo : maxSeqNoOfUpdatesOrDeletes.get() + " < " + seqNo;
}
public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.Assertions;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
Expand Down Expand Up @@ -146,6 +145,10 @@ public class InternalEngine extends Engine {
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
// max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine.
// An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index.
// The value of this marker never goes backwards, and is tracked/updated differently on primary and replica.
private final AtomicLong maxSeqNoOfUpdatesOrDeletes;
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
// Lucene operations since this engine was opened - not include operations from existing segments.
Expand Down Expand Up @@ -228,6 +231,7 @@ public InternalEngine(EngineConfig engineConfig) {
() -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier);
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()));
success = true;
} finally {
if (success == false) {
Expand Down Expand Up @@ -405,7 +409,6 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover
flushLock.lock();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is uninitialized";
if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
Expand Down Expand Up @@ -874,7 +877,7 @@ public IndexResult index(Index index) throws IOException {

final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
if (toAppend == false) {
advanceMaxSeqNoOfUpdatesOrDeletes(index.seqNo());
advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo());
}
} else {
markSeqNoAsSeen(index.seqNo());
Expand Down Expand Up @@ -981,7 +984,6 @@ protected IndexingStrategy indexingStrategyForOperation(final Index index) throw

protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
final IndexingStrategy plan;
// resolve an external operation into an internal one which is safe to replay
if (canOptimizeAddDocument(index)) {
Expand Down Expand Up @@ -1322,7 +1324,6 @@ protected boolean assertNonPrimaryOrigin(final Operation operation) {

protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
// resolve operation from external to internal
final VersionValue versionValue = resolveDocVersion(delete, delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO);
assert incrementVersionLookup();
Expand Down Expand Up @@ -2718,13 +2719,22 @@ private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) {
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
}

@Override
public long getMaxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes.get();
}

@Override
public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {
if (maxSeqNoOfUpdatesOnPrimary == SequenceNumbers.UNASSIGNED_SEQ_NO) {
assert false : "max_seq_no_of_updates on primary is unassigned";
throw new IllegalArgumentException("max_seq_no_of_updates on primary is unassigned");
}
this.maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, maxSeqNoOfUpdatesOnPrimary));
}

private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean allowDeleted, boolean relaxIfGapInSeqNo) {
final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes();
// If the primary is on an old version which does not replicate msu, we need to relax this assertion for that.
if (maxSeqNoOfUpdates == SequenceNumbers.UNASSIGNED_SEQ_NO) {
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_5_0);
return true;
}
// We treat a delete on the tombstones on replicas as a regular document, then use updateDocument (not addDocument).
if (allowDeleted) {
final VersionValue versionValue = versionMap.getVersionForAssert(id.bytes());
Expand All @@ -2742,12 +2752,6 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a
return true;
}

@Override
public void reinitializeMaxSeqNoOfUpdatesOrDeletes() {
final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo());
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo);
}

private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOException {
final Store store = engineConfig.getStore();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,6 @@ public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {

}

@Override
public void reinitializeMaxSeqNoOfUpdatesOrDeletes() {
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo());
}

protected void processReaders(IndexReader reader, IndexReader previousReader) {
searcherFactory.processReaders(reader, previousReader);
}
Expand All @@ -487,4 +482,15 @@ public Translog.Operation next() {
}
};
}

@Override
public long getMaxSeqNoOfUpdatesOrDeletes() {
return seqNoStats.getMaxSeqNo();
}

@Override
public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {
assert maxSeqNoOfUpdatesOnPrimary <= getMaxSeqNoOfUpdatesOrDeletes() :
maxSeqNoOfUpdatesOnPrimary + ">" + getMaxSeqNoOfUpdatesOrDeletes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -532,14 +532,6 @@ public void updateShardState(final ShardRouting newRouting,
* the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes.
*/
final Engine engine = getEngine();
if (getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO) {
// If the old primary was on an old version that did not replicate the msu,
// we need to bootstrap it manually from its local history.
assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0);
engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
}
// in case we previously reset engine, we need to forward MSU before replaying translog.
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) ->
runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}));
/* Rolling the translog generation is not strictly needed here (as we will never have collisions between
Expand Down Expand Up @@ -1411,9 +1403,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
translogRecoveryStats::incrementRecoveredOperations);
};
innerOpenEngineAndTranslog();
final Engine engine = getEngine();
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
}

/**
Expand Down Expand Up @@ -2206,12 +2196,6 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
synchronized (mutex) {
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
if (getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO) {
// If the old primary was on an old version that did not replicate the msu,
// we need to bootstrap it manually from its local history.
assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0);
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
}
}
}

Expand Down Expand Up @@ -3138,7 +3122,6 @@ public void close() throws IOException {
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig()));
onNewEngine(newEngineReference.get());
}
newEngineReference.get().advanceMaxSeqNoOfUpdatesOrDeletes(globalCheckpoint);
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
// TODO: add a dedicate recovery stats for the reset translog
Expand Down Expand Up @@ -3185,11 +3168,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() {
* @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, long, ActionListener)
*/
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
assert seqNo != UNASSIGNED_SEQ_NO
|| getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO :
"replica has max_seq_no_of_updates=" + getMaxSeqNoOfUpdatesOrDeletes() + " but primary does not";
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo);
assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo;
}

/**
Expand Down
Loading

0 comments on commit 73bfdc4

Please sign in to comment.