Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track max seq_no of updates or deletes on primary #33842

Merged
merged 16 commits into from
Sep 22, 2018
Merged

Conversation

dnhatn
Copy link
Member

@dnhatn dnhatn commented Sep 19, 2018

This PR is the first step to use seq_no to optimize indexing operations.
The idea is to track the max seq_no of either update or delete ops on a
primary, and transfer this information to replicas, and replicas use it
to optimize indexing plan for index operations (with assigned seq_no).

The max_seq_no_of_updates on primary is initialized once when a primary
finishes its local recovery or peer recovery in relocation or being
promoted. After that, the max_seq_no_of_updates is only advanced internally
inside an engine when processing update or delete operations.

Relates #33656

This PR is the first step to use seq_no to optimize indexing operations.
The idea is to track the max seq_no of either update or delete ops on a
primary, and transfer this information to replicas, and replicas use it
to optimize plan for indexing operations (with assigned seq_no).

The max_seq_no_of_updates on primary is initialized once when a primary
finishes its local recovery or peer recovery in relocation or being
promoted. After that, the max_seq_no_of_updates  is only advanced inside
an engine internally when processing an update or a delete operation.
@dnhatn dnhatn added >enhancement :Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. v7.0.0 :Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. v6.5.0 labels Sep 19, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. I left some minor comments and questions.

@@ -126,6 +127,9 @@
* inactive shards.
*/
protected volatile long lastWriteNanos = System.nanoTime();
// The maximum sequence number of either update or delete operations have been processed by this engine.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to define updates - this is a lucne term. in ES we have index and delete on the engine level (and updates to the users, which we don't mean here)

*/
public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo));
assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ehm - this needs to be the first line,no? also please add a message with the numbers

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may advance MSU concurrently, thus this assertion may not hold as the precondition.

@@ -922,11 +922,13 @@ protected IndexingStrategy indexingStrategyForOperation(final Index index) throw

protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may cause a bwc issue - if the primary is on an old node, it's a problem. Maybe only assert if the index created version is high enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We make sure that we always initialize max_seq_no_of_updates of the engine of a primary shard (in IndexShard). Please let me know if it's correct.

@@ -954,6 +956,10 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc
generateSeqNoForOperation(index),
index.versionType().updateVersion(currentVersion, index.version())
);
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can do this once at the end of the method based on the plan? It doesn't really matter how we came to the decision, right?

@@ -1245,6 +1251,7 @@ protected boolean assertNonPrimaryOrigin(final Operation operation) {

protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about index version and bwc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commenting here as I can comment on the relevant lines - can we add an assertion in the index method and delete method that if we update/delete lucene than the seq# of the delete/update is <= msu?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you need the replica logic to be in place for that assertions to be added.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to replicate msu first. I will add these assertions later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We make sure that we always initialize max_seq_no_of_updates of the engine of a primary shard (in IndexShard). Please let me know if it's correct.

@@ -1947,6 +1949,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
synchronized (mutex) {
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question as about the other promotion, why is this needed?

@@ -126,6 +127,9 @@
* inactive shards.
*/
protected volatile long lastWriteNanos = System.nanoTime();
// The maximum sequence number of either update or delete operations have been processed by this engine.
// This value is started with an unassigned status (-2) and will be initialized from outside.
private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can also clearly document that a -2 means the optimization should be disabled?

@@ -1324,6 +1325,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
};
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit doubtful about this one. I'm thinking that we should set it before starting to recover (base on the max seq no in the translog / engine) - that would allow us to have the invariant that of the index is new enough the msu is always set when indexing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I understand your point, but there is a reason that I have to put this initialization after we recover from translog.

  1. We don't restore LocalCheckpointTracker when opening an engine but only bootstrap local_checkpoint and max_seq_no from the safe commit.

  2. If we replay translog with the max_seq_no_updates set to the max_seq_no. We may duplicate operations whose seq_no between the max_seq_no from the safe commit and the max_seq_no from translog.

We can use the "isRetry" flag of index operations to solve this, but I am not sure because I think isRetry is attached to autoGeneratedIdTimestamp.

}
} else {
Engine.DeleteResult result = engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get()));
liveDocIds.remove(doc.id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this doesn't remove anything nothing, the msu doesn't advance, right? I think this is catched by the max in the assertion but I think we better be explicit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to always advance MSU for delete operations for simplicity. However, if we prefer something else, I am happy to change.

assertThat("delete operations on primary must advance max_seq_no_of_updates",
engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, result.getSeqNo())));
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

crazy pants for mixing primary and replica ops :) I'm fine with it for as long we can get away with it.

@dnhatn dnhatn requested a review from bleskes September 19, 2018 18:09
@dnhatn
Copy link
Member Author

dnhatn commented Sep 20, 2018

@bleskes Could you please give it another go? Thank you!

@dnhatn
Copy link
Member Author

dnhatn commented Sep 20, 2018

@bleskes I think we still have BWC issue with 3 copies:

  1. The primary on the new version, replica-1 on an old version, and replica-2 on the new version.
  2. The primary crashes and the replica-1 on an old version is promoted.
  3. The replica-2 (on the new version) has the max_seq_no_updates initialized but won't receive the new max_seq_no_of_updates from the promoted primary (on an old version).

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks Nhat!


@Override
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
assert getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also assert that translog recovery is pending?

* Advances the max_seq_no_of_updates marker of the engine of this shard to at least the given sequence number.
* A replica calls this method to advance the max_seq_no_of_updates marker of its engine to at least the max_seq_no_of_updates
* value (piggybacked in a replication request) that it receives from its primary before executing that replication request.
* The receiving value is at least the highest max_seq_no_of_updates of all index/delete operations in that replication request.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not clear here what "msu of all" means (it's clear in the google doc we have). Maybe write that value is at least as high as the msu on the primary was when any of the operations were processed on it?

@dnhatn
Copy link
Member Author

dnhatn commented Sep 21, 2018

Jenkins, run sample packaging tests please.

@bleskes
Copy link
Contributor

bleskes commented Sep 21, 2018

@bleskes I think we still have BWC issue with 3 copies:

  1. The primary on the new version, replica-1 on an old version, and replica-2 on the new version.
  2. The primary crashes and the replica-1 on an old version is promoted.
  3. The replica-2 (on the new version) has the max_seq_no_updates initialized but won't receive the new max_seq_no_of_updates from the promoted primary (on an old version).

As discussed on another channel this is not possible as we prevent promotion of an old shard to a primary if a replica exists on a node with a new version. This means that once a shard starts receiving new features from a new primary we can rely on those staying there.

dnhatn added a commit to dnhatn/elasticsearch that referenced this pull request Sep 22, 2018
We start tracking max seq_no_of_updates on the primary in elastic#33842. This
commit replicates that value from a primary to its replicas in
replication requests or the translog phase of peer-recovery.

With this change, we guarantee that the value of max seq_no_of_updates
on a replica when any index/delete operation is performed at least the
max_seq_no_of_updates on the primary when that operation was executed.

Relates elastic#33656
dnhatn added a commit that referenced this pull request Sep 25, 2018
We start tracking max seq_no_of_updates on the primary in #33842. This
commit replicates that value from a primary to its replicas in replication 
requests or the translog phase of peer-recovery.

With this change, we guarantee that the value of max seq_no_of_updates
on a replica when any index/delete operation is performed at least the
max_seq_no_of_updates on the primary when that operation was executed.

Relates #33656
dnhatn added a commit that referenced this pull request Sep 27, 2018
This PR is the first step to use seq_no to optimize indexing operations.
The idea is to track the max seq_no of either update or delete ops on a
primary, and transfer this information to replicas, and replicas use it
to optimize indexing plan for index operations (with assigned seq_no).

The max_seq_no_of_updates on primary is initialized once when a primary
finishes its local recovery or peer recovery in relocation or being
promoted. After that, the max_seq_no_of_updates is only advanced internally
inside an engine when processing update or delete operations.

Relates #33656
dnhatn added a commit that referenced this pull request Sep 27, 2018
We start tracking max seq_no_of_updates on the primary in #33842. This
commit replicates that value from a primary to its replicas in replication
requests or the translog phase of peer-recovery.

With this change, we guarantee that the value of max seq_no_of_updates
on a replica when any index/delete operation is performed at least the
max_seq_no_of_updates on the primary when that operation was executed.

Relates #33656
dnhatn added a commit that referenced this pull request Sep 27, 2018
dnhatn added a commit that referenced this pull request Sep 27, 2018
testOutOfOrderDocsOnReplicaOldPrimary verifies out of order operations
on an old primary engine (simulates a 5.x version). This engine should
not initialize max_seq_no_of_updates. However, the test uses a helper
"replicaEngine" which automatically bootstraps max_seq_no_of_updates.

Relates #33842
Closes #34110
kcm pushed a commit that referenced this pull request Oct 30, 2018
This PR is the first step to use seq_no to optimize indexing operations.
The idea is to track the max seq_no of either update or delete ops on a
primary, and transfer this information to replicas, and replicas use it
to optimize indexing plan for index operations (with assigned seq_no).

The max_seq_no_of_updates on primary is initialized once when a primary
finishes its local recovery or peer recovery in relocation or being
promoted. After that, the max_seq_no_of_updates is only advanced internally
inside an engine when processing update or delete operations.

Relates #33656
kcm pushed a commit that referenced this pull request Oct 30, 2018
We start tracking max seq_no_of_updates on the primary in #33842. This
commit replicates that value from a primary to its replicas in replication 
requests or the translog phase of peer-recovery.

With this change, we guarantee that the value of max seq_no_of_updates
on a replica when any index/delete operation is performed at least the
max_seq_no_of_updates on the primary when that operation was executed.

Relates #33656
kcm pushed a commit that referenced this pull request Oct 30, 2018
dnhatn added a commit that referenced this pull request Apr 30, 2019
Today we choose to initialize max_seq_no_of_updates on primaries only so
we can deal with a situation where a primary is on an old node (before
6.5) which does not have MUS while replicas on new nodes (6.5+).
However, this strategy is quite complex and can lead to bugs (for
example #40249) since we have to assign a correct value (not too low) to
MSU in all possible situations (before recovering from translog,
restoring history on promotion, and handing off relocation).

Fortunately, we don't have to deal with this BWC in 7.0+ since all nodes
in the cluster should have MSU. This change simplifies the
initialization of MSU by always assigning it a correct value in the
constructor of Engine regardless of whether it's a replica or primary.

Relates #33842
dnhatn added a commit that referenced this pull request Apr 30, 2019
Today we choose to initialize max_seq_no_of_updates on primaries only so
we can deal with a situation where a primary is on an old node (before
6.5) which does not have MUS while replicas on new nodes (6.5+).
However, this strategy is quite complex and can lead to bugs (for
example #40249) since we have to assign a correct value (not too low) to
MSU in all possible situations (before recovering from translog,
restoring history on promotion, and handing off relocation).

Fortunately, we don't have to deal with this BWC in 7.0+ since all nodes
in the cluster should have MSU. This change simplifies the
initialization of MSU by always assigning it a correct value in the
constructor of Engine regardless of whether it's a replica or primary.

Relates #33842
akhil10x5 pushed a commit to akhil10x5/elasticsearch that referenced this pull request May 2, 2019
Today we choose to initialize max_seq_no_of_updates on primaries only so
we can deal with a situation where a primary is on an old node (before
6.5) which does not have MUS while replicas on new nodes (6.5+).
However, this strategy is quite complex and can lead to bugs (for
example elastic#40249) since we have to assign a correct value (not too low) to
MSU in all possible situations (before recovering from translog,
restoring history on promotion, and handing off relocation).

Fortunately, we don't have to deal with this BWC in 7.0+ since all nodes
in the cluster should have MSU. This change simplifies the
initialization of MSU by always assigning it a correct value in the
constructor of Engine regardless of whether it's a replica or primary.

Relates elastic#33842
gurkankaymak pushed a commit to gurkankaymak/elasticsearch that referenced this pull request May 27, 2019
Today we choose to initialize max_seq_no_of_updates on primaries only so
we can deal with a situation where a primary is on an old node (before
6.5) which does not have MUS while replicas on new nodes (6.5+).
However, this strategy is quite complex and can lead to bugs (for
example elastic#40249) since we have to assign a correct value (not too low) to
MSU in all possible situations (before recovering from translog,
restoring history on promotion, and handing off relocation).

Fortunately, we don't have to deal with this BWC in 7.0+ since all nodes
in the cluster should have MSU. This change simplifies the
initialization of MSU by always assigning it a correct value in the
constructor of Engine regardless of whether it's a replica or primary.

Relates elastic#33842
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. :Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. >enhancement >non-issue v6.5.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants