-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Zen2: Add Cluster State Applier #34257
Conversation
Pinging @elastic/es-distributed |
Build failed when checking out 6.x. I'll merge latest zen2 branch into this. All the testing etc. looks good o.w. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed this in another channel and decided it'd be good to remove currentPublication
and come up with a better way to make the assertions that it helps us make.
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
Outdated
Show resolved
Hide resolved
@@ -433,7 +485,8 @@ public void invariant() { | |||
|
|||
final Set<DiscoveryNode> knownFollowers = followersChecker.getKnownFollowers(); | |||
final Set<DiscoveryNode> lastPublishedNodes = new HashSet<>(); | |||
if (becomingMaster == false || publicationInProgress()) { | |||
if (becomingMaster == false || | |||
(publicationInProgress() && getCurrentTerm() == currentPublication.get().publishedState().term())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm this is a surprising addition. How can we have a publication in progress whose term doesn't match our current term? That seems bad. I also had issues with this assertion in the vicinity of becoming a master, perhaps we should rethink this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've rewritten the assertions around currentPublication (see b77e5ab). I think we still need currentPublication for the term bumping though.
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few more requests, and there's an assertion that doesn't work correctly.
Also to get around the lack of term bumping I added this (plus imports and making things package-visible):
diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java
index b714f9a9530..42d321ea303 100644
--- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java
+++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java
@@ -597,6 +597,23 @@ public class CoordinatorTests extends ESTestCase {
runUntil(stabilisationEndTime);
+ // TODO remove when term-bumping is enabled
+ final long maxTerm = clusterNodes.stream().map(n -> n.coordinator.getCurrentTerm()).max(Long::compare).orElse(0L);
+ final long maxLeaderTerm = clusterNodes.stream().filter(n -> n.coordinator.getMode() == Mode.LEADER)
+ .map(n -> n.coordinator.getCurrentTerm()).max(Long::compare).orElse(0L);
+
+ if (maxLeaderTerm < maxTerm) {
+ logger.info("--> forcing a term bump, maxTerm={}, maxLeaderTerm={}", maxTerm, maxLeaderTerm);
+ final ClusterNode leader = getAnyLeader();
+ synchronized (leader.coordinator.mutex) {
+ leader.coordinator.ensureTermAtLeast(leader.localNode, maxTerm + 1);
+ }
+ leader.coordinator.startElection();
+ final long termBumpEndTime = stabilisationEndTime + DEFAULT_ELECTION_DELAY;
+ logger.info("--> re-stabilising after term bump until [{}ms]", termBumpEndTime);
+ runUntil(termBumpEndTime);
+ }
+
assertUniqueLeaderAndExpectedModes();
}
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
Outdated
Show resolved
Hide resolved
private final ListenableFuture<Void> localNodeAckEvent; | ||
private final AckListener ackListener; | ||
private final ActionListener<Void> publishListener; | ||
private boolean completed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this has the same semantics as isCompleted
in the base Publication
class. Could we just reuse that?
}, EsExecutors.newDirectExecutorService()); | ||
} | ||
|
||
boolean isActive() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps isIncomplete
to align with the "completed" nomenclature?
|
||
if (becomingMaster && activePublicationInProgress() == false) { | ||
// cluster state update task to become master is submitted to MasterService, but publication has not started yet | ||
assert followersChecker.getKnownFollowers().isEmpty() : followersChecker.getKnownFollowers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion fails with ./gradlew :server:test -Dtests.class=org.elasticsearch.cluster.coordination.CoordinatorTests -Dtests.method=testFollowerDisconnectionDetectedQuickly -Dtests.seed=396E71ACBA99321E:9198D7E2D2FC9784 -Dtests.jvm.argline=-Dhppc.bitmixer=DETERMINISTIC
- a publication that times out before being accepted locally results in becomingMaster (the last-accepted term is wrong) and activePublicationInProgress() == false
, but by this point the followersChecker
is active.
server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java
Outdated
Show resolved
Hide resolved
// TODO instead assert that knownFollowers is updated appropriately at the end of each publication | ||
assert lastPublishedNodes.remove(getLocalNode()); // followersChecker excludes local node | ||
assert lastPublishedNodes.equals(followersChecker.getKnownFollowers()) : | ||
lastPublishedNodes + " != " + followersChecker.getKnownFollowers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion fails:
:server:test -Dtests.class=org.elasticsearch.cluster.coordination.CoordinatorTests -Dtests.method=testUnresponsiveLeaderDetectedEventually -Dtests.seed=B39FA830E397231C:E88D9536D29A794 -Dtests.jvm.argline=-Dhppc.bitmixer=DETERMINISTIC
(I have some local modifications so may not reproduce exactly)
The reason is the same as in https://github.com/elastic/elasticsearch/pull/34257/files#r222583570: a publication is committed but times out before being accepted locally. Here it's a commit that's adding a node after the cluster formed, so the known followers are updated at the start of the publication but the last-accepted state doesn't reflect this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some back and forth, I think I've finally fixed this in 84a69c1
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, and to a few hundred iterations of the tests too. I left a few optional nits.
@@ -385,18 +420,28 @@ DiscoveryNode getLocalNode() { | |||
} | |||
|
|||
// package-visible for testing | |||
boolean publicationInProgress() { | |||
boolean activePublicationInProgress() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think this is just publicationInProgress()
again now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
@@ -461,6 +516,7 @@ public void invariant() { | |||
assert leaderChecker.currentNodeIsMaster() == false; | |||
assert leaderCheckScheduler != null; | |||
assert followersChecker.getKnownFollowers().isEmpty(); | |||
assert currentPublication.map(Publication::isCommitted).orElse(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. I think this is a strong enough link between currentPublication
and mode
.
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java
Show resolved
Hide resolved
@elasticmachine retest this please |
From experience with elastic#34257, here are a few things that help with analysing logs from test runs. Also we prevent trying to stabilise a cluster with raised delay variability, because lowering the delay variability requires time to allow all the extra-varied-scheduled tasks to work their way out of the system.
From experience with #34257, here are a few things that help with analysing logs from test runs. Also we prevent trying to stabilise a cluster with raised delay variability, because lowering the delay variability requires time to allow all the extra-varied-scheduled tasks to work their way out of the system.
Adds the cluster state applier to Coordinator, and adds tests for cluster state acking.