Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track max seq_no of updates or deletes on primary #33842

Merged
merged 16 commits into from
Sep 22, 2018
25 changes: 25 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -126,6 +127,9 @@ public abstract class Engine implements Closeable {
* inactive shards.
*/
protected volatile long lastWriteNanos = System.nanoTime();
// The maximum sequence number of either update or delete operations have been processed by this engine.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to define updates - this is a lucne term. in ES we have index and delete on the engine level (and updates to the users, which we don't mean here)

// This value is started with an unassigned status (-2) and will be initialized from outside.
private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can also clearly document that a -2 means the optimization should be disabled?


protected Engine(EngineConfig engineConfig) {
Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine");
Expand Down Expand Up @@ -1694,4 +1698,25 @@ public boolean isRecovering() {
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
}

/**
* Returns the maximum sequence number of either update or delete operations have been processed
* in this engine or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}.
* <p>
* For a primary engine, this value is initialized once, then advanced internally when it processes
* an update or a delete operation. Whereas a replica engine never updates this value by itself but
* only inherits the latest value from its primary. In both cases, this value never goes backwards.
*/
public final long getMaxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes.get();
}

/**
* Advances the max_seq_no_of_updates marker of this engine to at least the given sequence number.
* @see #getMaxSeqNoOfUpdatesOrDeletes()
*/
public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo));
assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ehm - this needs to be the first line,no? also please add a message with the numbers

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may advance MSU concurrently, thus this assertion may not hold as the precondition.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -922,11 +922,13 @@ protected IndexingStrategy indexingStrategyForOperation(final Index index) throw

protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may cause a bwc issue - if the primary is on an old node, it's a problem. Maybe only assert if the index created version is high enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We make sure that we always initialize max_seq_no_of_updates of the engine of a primary shard (in IndexShard). Please let me know if it's correct.

final IndexingStrategy plan;
// resolve an external operation into an internal one which is safe to replay
if (canOptimizeAddDocument(index)) {
if (mayHaveBeenIndexedBefore(index)) {
plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L);
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing);
versionMap.enforceSafeAccess();
} else {
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index));
Expand Down Expand Up @@ -954,6 +956,10 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc
generateSeqNoForOperation(index),
index.versionType().updateVersion(currentVersion, index.version())
);
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can do this once at the end of the method based on the plan? It doesn't really matter how we came to the decision, right?

if (toAppend == false) {
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing);
}
}
}
return plan;
Expand Down Expand Up @@ -1245,6 +1251,7 @@ protected boolean assertNonPrimaryOrigin(final Operation operation) {

protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about index version and bwc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commenting here as I can comment on the relevant lines - can we add an assertion in the index method and delete method that if we update/delete lucene than the seq# of the delete/update is <= msu?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you need the replica logic to be in place for that assertions to be added.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to replicate msu first. I will add these assertions later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We make sure that we always initialize max_seq_no_of_updates of the engine of a primary shard (in IndexShard). Please let me know if it's correct.

// resolve operation from external to internal
final VersionValue versionValue = resolveDocVersion(delete);
assert incrementVersionLookup();
Expand All @@ -1266,6 +1273,7 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE
currentlyDeleted,
generateSeqNoForOperation(delete),
delete.versionType().updateVersion(currentVersion, delete.version()));
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoOfDeletion);
}
return plan;
}
Expand Down
26 changes: 26 additions & 0 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ public void updateShardState(final ShardRouting newRouting,
*/
engine.rollTranslogGeneration();
engine.fillSeqNoGaps(newPrimaryTerm);
engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to do this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you want to cover bwc - should we just do it if the index version is old?

replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint());
primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() {
@Override
Expand Down Expand Up @@ -1324,6 +1325,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
};
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit doubtful about this one. I'm thinking that we should set it before starting to recover (base on the max seq no in the translog / engine) - that would allow us to have the invariant that of the index is new enough the msu is always set when indexing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I understand your point, but there is a reason that I have to put this initialization after we recover from translog.

  1. We don't restore LocalCheckpointTracker when opening an engine but only bootstrap local_checkpoint and max_seq_no from the safe commit.

  2. If we replay translog with the max_seq_no_updates set to the max_seq_no. We may duplicate operations whose seq_no between the max_seq_no from the safe commit and the max_seq_no from translog.

We can use the "isRetry" flag of index operations to solve this, but I am not sure because I think isRetry is attached to autoGeneratedIdTimestamp.

}

/**
Expand Down Expand Up @@ -1947,6 +1949,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
synchronized (mutex) {
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question as about the other promotion, why is this needed?

}
}

Expand Down Expand Up @@ -2720,4 +2723,27 @@ void resetEngineToGlobalCheckpoint() throws IOException {
});
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint);
}

/**
* Returns the maximum sequence number of either update operations (overwrite existing documents) or delete operations
* have been processed in this shard or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}.
* <p>
* The primary captures this value after executes a replication request, then transfers it to a replica before executing
* that replication request on a replica.
*/
public long getMaxSeqNoOfUpdatesOrDeletes() {
return getEngine().getMaxSeqNoOfUpdatesOrDeletes();
}

/**
* Advances the max_seq_no_of_updates marker of the engine of this shard to at least the given sequence number.
* <p>
* A primary calls this method only once to initialize this maker after being promoted or when it finishes its
* recovery or relocation. Whereas a replica calls this method before executing a replication request or before
* applying translog operations in peer-recovery.
*/
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo);
assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ public void testFlushIsDisabledDuringTranslogRecovery() throws IOException {
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
assertTrue(engine.isRecovering());
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo());
assertFalse(engine.isRecovering());
doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
Expand Down Expand Up @@ -2678,6 +2679,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
}
}) {
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo());
final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc1));
globalCheckpoint.set(engine.getLocalCheckpoint());
Expand Down Expand Up @@ -3461,9 +3463,11 @@ public void testEngineMaxTimestampIsInitialized() throws IOException {
engine.index(appendOnlyPrimary(doc, true, timestamp1));
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
}
try (Store store = createStore(newFSDirectory(storeDir)); Engine engine = new InternalEngine(configSupplier.apply(store))) {
try (Store store = createStore(newFSDirectory(storeDir));
InternalEngine engine = new InternalEngine(configSupplier.apply(store))) {
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
Expand Down Expand Up @@ -4353,7 +4357,7 @@ public void testKeepTranslogAfterGlobalCheckpoint() throws Exception {

final EngineConfig engineConfig = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null,
() -> globalCheckpoint.get());
try (Engine engine = new InternalEngine(engineConfig) {
try (InternalEngine engine = new InternalEngine(engineConfig) {
@Override
protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
// Advance the global checkpoint during the flush to create a lag between a persisted global checkpoint in the translog
Expand All @@ -4365,6 +4369,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
}
}) {
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo());
int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
ParseContext.Document document = testDocumentWithTextField();
Expand Down Expand Up @@ -5033,6 +5038,47 @@ public void testAcquireSearcherOnClosingEngine() throws Exception {
expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test"));
}

public void testTrackMaxSeqNoOfUpdatesOrDeletes() throws Exception {
engine.close();
Set<String> liveDocIds = new HashSet<>();
engine = new InternalEngine(engine.config());
assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-2L));
engine.advanceMaxSeqNoOfUpdatesOrDeletes(randomLongBetween(-1L, 50L));
int numOps = between(1, 500);
for (int i = 0; i < numOps; i++) {
long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes();
ParsedDocument doc = createParsedDoc(Integer.toString(between(1, 100)), null);
if (randomBoolean()) {
if (randomBoolean()) {
Engine.IndexResult result = engine.index(indexForDoc(doc));
if (liveDocIds.add(doc.id()) == false) {
assertThat("update operations on primary must advance max_seq_no_of_updates",
engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, result.getSeqNo())));
} else {
assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(currentMaxSeqNoOfUpdates));
}
} else {
Engine.DeleteResult result = engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get()));
liveDocIds.remove(doc.id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this doesn't remove anything nothing, the msu doesn't advance, right? I think this is catched by the max in the assertion but I think we better be explicit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to always advance MSU for delete operations for simplicity. However, if we prefer something else, I am happy to change.

assertThat("delete operations on primary must advance max_seq_no_of_updates",
engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, result.getSeqNo())));
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

crazy pants for mixing primary and replica ops :) I'm fine with it for as long we can get away with it.

long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo();
long seqNo = randomLongBetween(maxSeqNo + 1, maxSeqNo + 10);
if (randomBoolean()) {
engine.index(replicaIndexForDoc(doc, 1, seqNo, randomBoolean()));
liveDocIds.add(doc.id());
} else {
engine.delete(replicaDeleteForDoc(doc.id(), 1, seqNo, threadPool.relativeTimeInMillis()));
liveDocIds.remove(doc.id());
}
assertThat("non-primary operations should not advance max_seq_no_of_updates",
engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(currentMaxSeqNoOfUpdates));
}
}
}

static void trimUnsafeCommits(EngineConfig config) throws IOException {
final Store store = config.getStore();
final TranslogConfig translogConfig = config.getTranslogConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,7 @@ public void onFailure(Exception e) {
resyncLatch.await();
assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo));
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNo));
closeShard(indexShard, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilari
EngineTestCase.tombstoneDocSupplier());
engine = new InternalEngine(config);
engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE);
engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getSeqNoStats(-1).getMaxSeqNo());
listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ public void testUnallocatedShardsDoesNotHang() throws InterruptedException {

private void indexDoc(Engine engine, String id) throws IOException {
final ParsedDocument doc = InternalEngineTests.createParsedDoc(id, null);
final Engine.IndexResult indexResult = engine.index(new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), 1L, doc));
final Engine.IndexResult indexResult = engine.index(new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), doc,
engine.getLocalCheckpoint() + 1, 1L, 1L, null, Engine.Operation.Origin.REPLICA, randomLong(), -1L, false));
assertThat(indexResult.getFailure(), nullValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ private InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFact
}
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config);
internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
// IndexShard initializes this value after replaying local translog.
internalEngine.advanceMaxSeqNoOfUpdatesOrDeletes(internalEngine.getLocalCheckpointTracker().getMaxSeqNo());
return internalEngine;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@

import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

/**
Expand Down Expand Up @@ -444,6 +445,7 @@ protected IndexShard newStartedShard(CheckedFunction<Boolean, IndexShard, IOExce
IndexShard shard = shardFunction.apply(primary);
if (primary) {
recoverShardFromStore(shard);
assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(shard.seqNoStats().getMaxSeqNo()));
} else {
recoveryEmptyReplica(shard, true);
}
Expand Down