-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Track max seq_no of updates or deletes on primary #33842
Conversation
This PR is the first step to use seq_no to optimize indexing operations. The idea is to track the max seq_no of either update or delete ops on a primary, and transfer this information to replicas, and replicas use it to optimize plan for indexing operations (with assigned seq_no). The max_seq_no_of_updates on primary is initialized once when a primary finishes its local recovery or peer recovery in relocation or being promoted. After that, the max_seq_no_of_updates is only advanced inside an engine internally when processing an update or a delete operation.
Pinging @elastic/es-distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great. I left some minor comments and questions.
@@ -126,6 +127,9 @@ | |||
* inactive shards. | |||
*/ | |||
protected volatile long lastWriteNanos = System.nanoTime(); | |||
// The maximum sequence number of either update or delete operations have been processed by this engine. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to define updates - this is a lucne term. in ES we have index and delete on the engine level (and updates to the users, which we don't mean here)
*/ | ||
public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { | ||
maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo)); | ||
assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ehm - this needs to be the first line,no? also please add a message with the numbers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may advance MSU concurrently, thus this assertion may not hold as the precondition.
@@ -922,11 +922,13 @@ protected IndexingStrategy indexingStrategyForOperation(final Index index) throw | |||
|
|||
protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { | |||
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); | |||
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this may cause a bwc issue - if the primary is on an old node, it's a problem. Maybe only assert if the index created version is high enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We make sure that we always initialize max_seq_no_of_updates of the engine of a primary shard (in IndexShard). Please let me know if it's correct.
@@ -954,6 +956,10 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc | |||
generateSeqNoForOperation(index), | |||
index.versionType().updateVersion(currentVersion, index.version()) | |||
); | |||
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can do this once at the end of the method based on the plan? It doesn't really matter how we came to the decision, right?
@@ -1245,6 +1251,7 @@ protected boolean assertNonPrimaryOrigin(final Operation operation) { | |||
|
|||
protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { | |||
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); | |||
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about index version and bwc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commenting here as I can comment on the relevant lines - can we add an assertion in the index method and delete method that if we update/delete lucene than the seq# of the delete/update is <= msu?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you need the replica logic to be in place for that assertions to be added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we need to replicate msu first. I will add these assertions later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We make sure that we always initialize max_seq_no_of_updates of the engine of a primary shard (in IndexShard). Please let me know if it's correct.
@@ -1947,6 +1949,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p | |||
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint(); | |||
synchronized (mutex) { | |||
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex | |||
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question as about the other promotion, why is this needed?
@@ -126,6 +127,9 @@ | |||
* inactive shards. | |||
*/ | |||
protected volatile long lastWriteNanos = System.nanoTime(); | |||
// The maximum sequence number of either update or delete operations have been processed by this engine. | |||
// This value is started with an unassigned status (-2) and will be initialized from outside. | |||
private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can also clearly document that a -2 means the optimization should be disabled?
@@ -1324,6 +1325,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException { | |||
}; | |||
innerOpenEngineAndTranslog(); | |||
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); | |||
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit doubtful about this one. I'm thinking that we should set it before starting to recover (base on the max seq no in the translog / engine) - that would allow us to have the invariant that of the index is new enough the msu is always set when indexing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I understand your point, but there is a reason that I have to put this initialization after we recover from translog.
-
We don't restore LocalCheckpointTracker when opening an engine but only bootstrap local_checkpoint and max_seq_no from the safe commit.
-
If we replay translog with the max_seq_no_updates set to the max_seq_no. We may duplicate operations whose seq_no between the max_seq_no from the safe commit and the max_seq_no from translog.
We can use the "isRetry" flag of index operations to solve this, but I am not sure because I think isRetry
is attached to autoGeneratedIdTimestamp
.
} | ||
} else { | ||
Engine.DeleteResult result = engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); | ||
liveDocIds.remove(doc.id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this doesn't remove anything nothing, the msu doesn't advance, right? I think this is catched by the max in the assertion but I think we better be explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to always advance MSU for delete operations for simplicity. However, if we prefer something else, I am happy to change.
assertThat("delete operations on primary must advance max_seq_no_of_updates", | ||
engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, result.getSeqNo()))); | ||
} | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
crazy pants for mixing primary and replica ops :) I'm fine with it for as long we can get away with it.
@bleskes Could you please give it another go? Thank you! |
@bleskes I think we still have BWC issue with 3 copies:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks Nhat!
|
||
@Override | ||
public void initializeMaxSeqNoOfUpdatesOrDeletes() { | ||
assert getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also assert that translog recovery is pending?
* Advances the max_seq_no_of_updates marker of the engine of this shard to at least the given sequence number. | ||
* A replica calls this method to advance the max_seq_no_of_updates marker of its engine to at least the max_seq_no_of_updates | ||
* value (piggybacked in a replication request) that it receives from its primary before executing that replication request. | ||
* The receiving value is at least the highest max_seq_no_of_updates of all index/delete operations in that replication request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not clear here what "msu of all" means (it's clear in the google doc we have). Maybe write that value is at least as high as the msu on the primary was when any of the operations were processed on it?
Jenkins, run sample packaging tests please. |
As discussed on another channel this is not possible as we prevent promotion of an old shard to a primary if a replica exists on a node with a new version. This means that once a shard starts receiving new features from a new primary we can rely on those staying there. |
We start tracking max seq_no_of_updates on the primary in elastic#33842. This commit replicates that value from a primary to its replicas in replication requests or the translog phase of peer-recovery. With this change, we guarantee that the value of max seq_no_of_updates on a replica when any index/delete operation is performed at least the max_seq_no_of_updates on the primary when that operation was executed. Relates elastic#33656
We start tracking max seq_no_of_updates on the primary in #33842. This commit replicates that value from a primary to its replicas in replication requests or the translog phase of peer-recovery. With this change, we guarantee that the value of max seq_no_of_updates on a replica when any index/delete operation is performed at least the max_seq_no_of_updates on the primary when that operation was executed. Relates #33656
This PR is the first step to use seq_no to optimize indexing operations. The idea is to track the max seq_no of either update or delete ops on a primary, and transfer this information to replicas, and replicas use it to optimize indexing plan for index operations (with assigned seq_no). The max_seq_no_of_updates on primary is initialized once when a primary finishes its local recovery or peer recovery in relocation or being promoted. After that, the max_seq_no_of_updates is only advanced internally inside an engine when processing update or delete operations. Relates #33656
We start tracking max seq_no_of_updates on the primary in #33842. This commit replicates that value from a primary to its replicas in replication requests or the translog phase of peer-recovery. With this change, we guarantee that the value of max seq_no_of_updates on a replica when any index/delete operation is performed at least the max_seq_no_of_updates on the primary when that operation was executed. Relates #33656
testOutOfOrderDocsOnReplicaOldPrimary verifies out of order operations on an old primary engine (simulates a 5.x version). This engine should not initialize max_seq_no_of_updates. However, the test uses a helper "replicaEngine" which automatically bootstraps max_seq_no_of_updates. Relates #33842 Closes #34110
This PR is the first step to use seq_no to optimize indexing operations. The idea is to track the max seq_no of either update or delete ops on a primary, and transfer this information to replicas, and replicas use it to optimize indexing plan for index operations (with assigned seq_no). The max_seq_no_of_updates on primary is initialized once when a primary finishes its local recovery or peer recovery in relocation or being promoted. After that, the max_seq_no_of_updates is only advanced internally inside an engine when processing update or delete operations. Relates #33656
We start tracking max seq_no_of_updates on the primary in #33842. This commit replicates that value from a primary to its replicas in replication requests or the translog phase of peer-recovery. With this change, we guarantee that the value of max seq_no_of_updates on a replica when any index/delete operation is performed at least the max_seq_no_of_updates on the primary when that operation was executed. Relates #33656
Today we choose to initialize max_seq_no_of_updates on primaries only so we can deal with a situation where a primary is on an old node (before 6.5) which does not have MUS while replicas on new nodes (6.5+). However, this strategy is quite complex and can lead to bugs (for example #40249) since we have to assign a correct value (not too low) to MSU in all possible situations (before recovering from translog, restoring history on promotion, and handing off relocation). Fortunately, we don't have to deal with this BWC in 7.0+ since all nodes in the cluster should have MSU. This change simplifies the initialization of MSU by always assigning it a correct value in the constructor of Engine regardless of whether it's a replica or primary. Relates #33842
Today we choose to initialize max_seq_no_of_updates on primaries only so we can deal with a situation where a primary is on an old node (before 6.5) which does not have MUS while replicas on new nodes (6.5+). However, this strategy is quite complex and can lead to bugs (for example #40249) since we have to assign a correct value (not too low) to MSU in all possible situations (before recovering from translog, restoring history on promotion, and handing off relocation). Fortunately, we don't have to deal with this BWC in 7.0+ since all nodes in the cluster should have MSU. This change simplifies the initialization of MSU by always assigning it a correct value in the constructor of Engine regardless of whether it's a replica or primary. Relates #33842
Today we choose to initialize max_seq_no_of_updates on primaries only so we can deal with a situation where a primary is on an old node (before 6.5) which does not have MUS while replicas on new nodes (6.5+). However, this strategy is quite complex and can lead to bugs (for example elastic#40249) since we have to assign a correct value (not too low) to MSU in all possible situations (before recovering from translog, restoring history on promotion, and handing off relocation). Fortunately, we don't have to deal with this BWC in 7.0+ since all nodes in the cluster should have MSU. This change simplifies the initialization of MSU by always assigning it a correct value in the constructor of Engine regardless of whether it's a replica or primary. Relates elastic#33842
Today we choose to initialize max_seq_no_of_updates on primaries only so we can deal with a situation where a primary is on an old node (before 6.5) which does not have MUS while replicas on new nodes (6.5+). However, this strategy is quite complex and can lead to bugs (for example elastic#40249) since we have to assign a correct value (not too low) to MSU in all possible situations (before recovering from translog, restoring history on promotion, and handing off relocation). Fortunately, we don't have to deal with this BWC in 7.0+ since all nodes in the cluster should have MSU. This change simplifies the initialization of MSU by always assigning it a correct value in the constructor of Engine regardless of whether it's a replica or primary. Relates elastic#33842
This PR is the first step to use seq_no to optimize indexing operations.
The idea is to track the max seq_no of either update or delete ops on a
primary, and transfer this information to replicas, and replicas use it
to optimize indexing plan for index operations (with assigned seq_no).
The max_seq_no_of_updates on primary is initialized once when a primary
finishes its local recovery or peer recovery in relocation or being
promoted. After that, the max_seq_no_of_updates is only advanced internally
inside an engine when processing update or delete operations.
Relates #33656