-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Track max seq_no of updates or deletes on primary #33842
Changes from 2 commits
587ea79
2297846
d84de4a
1ffeaea
f27d5f8
45fc088
a7269dc
2dd8758
dcc23fd
516fbeb
94fa692
e92e71e
fbc4de2
fafd679
0b632a3
84a985f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -88,6 +88,7 @@ | |
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.concurrent.locks.Condition; | ||
import java.util.concurrent.locks.Lock; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
|
@@ -126,6 +127,9 @@ public abstract class Engine implements Closeable { | |
* inactive shards. | ||
*/ | ||
protected volatile long lastWriteNanos = System.nanoTime(); | ||
// The maximum sequence number of either update or delete operations have been processed by this engine. | ||
// This value is started with an unassigned status (-2) and will be initialized from outside. | ||
private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can also clearly document that a -2 means the optimization should be disabled? |
||
|
||
protected Engine(EngineConfig engineConfig) { | ||
Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine"); | ||
|
@@ -1694,4 +1698,25 @@ public boolean isRecovering() { | |
public interface TranslogRecoveryRunner { | ||
int run(Engine engine, Translog.Snapshot snapshot) throws IOException; | ||
} | ||
|
||
/** | ||
* Returns the maximum sequence number of either update or delete operations have been processed | ||
* in this engine or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. | ||
* <p> | ||
* For a primary engine, this value is initialized once, then advanced internally when it processes | ||
* an update or a delete operation. Whereas a replica engine never updates this value by itself but | ||
* only inherits the latest value from its primary. In both cases, this value never goes backwards. | ||
*/ | ||
public final long getMaxSeqNoOfUpdatesOrDeletes() { | ||
return maxSeqNoOfUpdatesOrDeletes.get(); | ||
} | ||
|
||
/** | ||
* Advances the max_seq_no_of_updates marker of this engine to at least the given sequence number. | ||
* @see #getMaxSeqNoOfUpdatesOrDeletes() | ||
*/ | ||
public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { | ||
maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo)); | ||
assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ehm - this needs to be the first line,no? also please add a message with the numbers There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may advance MSU concurrently, thus this assertion may not hold as the precondition. |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -922,11 +922,13 @@ protected IndexingStrategy indexingStrategyForOperation(final Index index) throw | |
|
||
protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { | ||
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); | ||
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this may cause a bwc issue - if the primary is on an old node, it's a problem. Maybe only assert if the index created version is high enough? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We make sure that we always initialize max_seq_no_of_updates of the engine of a primary shard (in IndexShard). Please let me know if it's correct. |
||
final IndexingStrategy plan; | ||
// resolve an external operation into an internal one which is safe to replay | ||
if (canOptimizeAddDocument(index)) { | ||
if (mayHaveBeenIndexedBefore(index)) { | ||
plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L); | ||
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing); | ||
versionMap.enforceSafeAccess(); | ||
} else { | ||
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index)); | ||
|
@@ -954,6 +956,10 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc | |
generateSeqNoForOperation(index), | ||
index.versionType().updateVersion(currentVersion, index.version()) | ||
); | ||
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we can do this once at the end of the method based on the plan? It doesn't really matter how we came to the decision, right? |
||
if (toAppend == false) { | ||
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing); | ||
} | ||
} | ||
} | ||
return plan; | ||
|
@@ -1245,6 +1251,7 @@ protected boolean assertNonPrimaryOrigin(final Operation operation) { | |
|
||
protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { | ||
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); | ||
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment about index version and bwc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. commenting here as I can comment on the relevant lines - can we add an assertion in the index method and delete method that if we update/delete lucene than the seq# of the delete/update is <= msu? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess you need the replica logic to be in place for that assertions to be added. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we need to replicate msu first. I will add these assertions later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We make sure that we always initialize max_seq_no_of_updates of the engine of a primary shard (in IndexShard). Please let me know if it's correct. |
||
// resolve operation from external to internal | ||
final VersionValue versionValue = resolveDocVersion(delete); | ||
assert incrementVersionLookup(); | ||
|
@@ -1266,6 +1273,7 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE | |
currentlyDeleted, | ||
generateSeqNoForOperation(delete), | ||
delete.versionType().updateVersion(currentVersion, delete.version())); | ||
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoOfDeletion); | ||
} | ||
return plan; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -516,6 +516,7 @@ public void updateShardState(final ShardRouting newRouting, | |
*/ | ||
engine.rollTranslogGeneration(); | ||
engine.fillSeqNoGaps(newPrimaryTerm); | ||
engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to do this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess you want to cover bwc - should we just do it if the index version is old? |
||
replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint()); | ||
primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() { | ||
@Override | ||
|
@@ -1324,6 +1325,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException { | |
}; | ||
innerOpenEngineAndTranslog(); | ||
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); | ||
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a bit doubtful about this one. I'm thinking that we should set it before starting to recover (base on the max seq no in the translog / engine) - that would allow us to have the invariant that of the index is new enough the msu is always set when indexing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I understand your point, but there is a reason that I have to put this initialization after we recover from translog.
We can use the "isRetry" flag of index operations to solve this, but I am not sure because I think |
||
} | ||
|
||
/** | ||
|
@@ -1947,6 +1949,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p | |
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint(); | ||
synchronized (mutex) { | ||
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex | ||
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same question as about the other promotion, why is this needed? |
||
} | ||
} | ||
|
||
|
@@ -2720,4 +2723,27 @@ void resetEngineToGlobalCheckpoint() throws IOException { | |
}); | ||
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint); | ||
} | ||
|
||
/** | ||
* Returns the maximum sequence number of either update operations (overwrite existing documents) or delete operations | ||
* have been processed in this shard or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. | ||
* <p> | ||
* The primary captures this value after executes a replication request, then transfers it to a replica before executing | ||
* that replication request on a replica. | ||
*/ | ||
public long getMaxSeqNoOfUpdatesOrDeletes() { | ||
return getEngine().getMaxSeqNoOfUpdatesOrDeletes(); | ||
} | ||
|
||
/** | ||
* Advances the max_seq_no_of_updates marker of the engine of this shard to at least the given sequence number. | ||
* <p> | ||
* A primary calls this method only once to initialize this maker after being promoted or when it finishes its | ||
* recovery or relocation. Whereas a replica calls this method before executing a replication request or before | ||
* applying translog operations in peer-recovery. | ||
*/ | ||
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { | ||
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); | ||
assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -679,6 +679,7 @@ public void testFlushIsDisabledDuringTranslogRecovery() throws IOException { | |
expectThrows(IllegalStateException.class, () -> engine.flush(true, true)); | ||
assertTrue(engine.isRecovering()); | ||
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); | ||
engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo()); | ||
assertFalse(engine.isRecovering()); | ||
doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null); | ||
engine.index(indexForDoc(doc)); | ||
|
@@ -2678,6 +2679,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s | |
} | ||
}) { | ||
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); | ||
engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo()); | ||
final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); | ||
engine.index(indexForDoc(doc1)); | ||
globalCheckpoint.set(engine.getLocalCheckpoint()); | ||
|
@@ -3461,9 +3463,11 @@ public void testEngineMaxTimestampIsInitialized() throws IOException { | |
engine.index(appendOnlyPrimary(doc, true, timestamp1)); | ||
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); | ||
} | ||
try (Store store = createStore(newFSDirectory(storeDir)); Engine engine = new InternalEngine(configSupplier.apply(store))) { | ||
try (Store store = createStore(newFSDirectory(storeDir)); | ||
InternalEngine engine = new InternalEngine(configSupplier.apply(store))) { | ||
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); | ||
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); | ||
engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo()); | ||
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); | ||
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), | ||
new BytesArray("{}".getBytes(Charset.defaultCharset())), null); | ||
|
@@ -4353,7 +4357,7 @@ public void testKeepTranslogAfterGlobalCheckpoint() throws Exception { | |
|
||
final EngineConfig engineConfig = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, | ||
() -> globalCheckpoint.get()); | ||
try (Engine engine = new InternalEngine(engineConfig) { | ||
try (InternalEngine engine = new InternalEngine(engineConfig) { | ||
@Override | ||
protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { | ||
// Advance the global checkpoint during the flush to create a lag between a persisted global checkpoint in the translog | ||
|
@@ -4365,6 +4369,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s | |
} | ||
}) { | ||
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); | ||
engine.advanceMaxSeqNoOfUpdatesOrDeletes(engine.getLocalCheckpointTracker().getMaxSeqNo()); | ||
int numDocs = scaledRandomIntBetween(10, 100); | ||
for (int docId = 0; docId < numDocs; docId++) { | ||
ParseContext.Document document = testDocumentWithTextField(); | ||
|
@@ -5033,6 +5038,47 @@ public void testAcquireSearcherOnClosingEngine() throws Exception { | |
expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test")); | ||
} | ||
|
||
public void testTrackMaxSeqNoOfUpdatesOrDeletes() throws Exception { | ||
engine.close(); | ||
Set<String> liveDocIds = new HashSet<>(); | ||
engine = new InternalEngine(engine.config()); | ||
assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-2L)); | ||
engine.advanceMaxSeqNoOfUpdatesOrDeletes(randomLongBetween(-1L, 50L)); | ||
int numOps = between(1, 500); | ||
for (int i = 0; i < numOps; i++) { | ||
long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes(); | ||
ParsedDocument doc = createParsedDoc(Integer.toString(between(1, 100)), null); | ||
if (randomBoolean()) { | ||
if (randomBoolean()) { | ||
Engine.IndexResult result = engine.index(indexForDoc(doc)); | ||
if (liveDocIds.add(doc.id()) == false) { | ||
assertThat("update operations on primary must advance max_seq_no_of_updates", | ||
engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, result.getSeqNo()))); | ||
} else { | ||
assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(currentMaxSeqNoOfUpdates)); | ||
} | ||
} else { | ||
Engine.DeleteResult result = engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); | ||
liveDocIds.remove(doc.id()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this doesn't remove anything nothing, the msu doesn't advance, right? I think this is catched by the max in the assertion but I think we better be explicit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer to always advance MSU for delete operations for simplicity. However, if we prefer something else, I am happy to change. |
||
assertThat("delete operations on primary must advance max_seq_no_of_updates", | ||
engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, result.getSeqNo()))); | ||
} | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. crazy pants for mixing primary and replica ops :) I'm fine with it for as long we can get away with it. |
||
long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo(); | ||
long seqNo = randomLongBetween(maxSeqNo + 1, maxSeqNo + 10); | ||
if (randomBoolean()) { | ||
engine.index(replicaIndexForDoc(doc, 1, seqNo, randomBoolean())); | ||
liveDocIds.add(doc.id()); | ||
} else { | ||
engine.delete(replicaDeleteForDoc(doc.id(), 1, seqNo, threadPool.relativeTimeInMillis())); | ||
liveDocIds.remove(doc.id()); | ||
} | ||
assertThat("non-primary operations should not advance max_seq_no_of_updates", | ||
engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(currentMaxSeqNoOfUpdates)); | ||
} | ||
} | ||
} | ||
|
||
static void trimUnsafeCommits(EngineConfig config) throws IOException { | ||
final Store store = config.getStore(); | ||
final TranslogConfig translogConfig = config.getTranslogConfig(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to define updates - this is a lucne term. in ES we have index and delete on the engine level (and updates to the users, which we don't mean here)