Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Zen2] Logging improvements in CoordinatorTests #33991

Merged
merged 3 commits into from
Sep 24, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,21 @@ public void setInitialState(ClusterState initialState) {
*/
public Join handleStartJoin(StartJoinRequest startJoinRequest) {
if (startJoinRequest.getTerm() <= getCurrentTerm()) {
logger.debug("handleStartJoin: ignored as term provided [{}] not greater than current term [{}]",
startJoinRequest.getTerm(), getCurrentTerm());
logger.debug("handleStartJoin: ignoring [{}] as term provided is not greater than current term [{}]",
startJoinRequest, getCurrentTerm());
throw new CoordinationStateRejectedException("incoming term " + startJoinRequest.getTerm() +
" not greater than current term " + getCurrentTerm());
}

logger.debug("handleStartJoin: updating term from [{}] to [{}]", getCurrentTerm(), startJoinRequest.getTerm());
logger.debug("handleStartJoin: leaving term [{}] due to {}", getCurrentTerm(), startJoinRequest);

if (joinVotes.isEmpty() == false) {
if (electionWon == false) {
logger.debug("handleStartJoin: failed election, discarding {}", joinVotes);
} else if (startJoinRequest.getSourceNode().equals(localNode) == false) {
logger.debug("handleStartJoin: standing down as leader, discarding {}", joinVotes);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also log here when bumping the term as leader? I think all the cases where there are existing votes are interesting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

}

persistedState.setCurrentTerm(startJoinRequest.getTerm());
assert getCurrentTerm() == startJoinRequest.getTerm();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -99,7 +100,7 @@ public Coordinator(Settings settings, TransportService transportService, Allocat
this.persistedStateSupplier = persistedStateSupplier;
this.lastKnownLeader = Optional.empty();
this.lastJoin = Optional.empty();
this.joinAccumulator = joinHelper.new CandidateJoinAccumulator();
this.joinAccumulator = new InitialJoinAccumulator();
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
this.preVoteCollector = new PreVoteCollector(settings, transportService, this::startElection, this::updateMaxTermSeen);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ interface JoinAccumulator {
void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback);

default void close(Mode newMode) {

}
}

Expand All @@ -220,6 +219,19 @@ public String toString() {
}
}

static class InitialJoinAccumulator implements JoinAccumulator {
@Override
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
assert false : "unexpected join from " + sender + " during initialisation";
joinCallback.onFailure(new CoordinationStateRejectedException("join target is not initialised yet"));
}

@Override
public String toString() {
return "InitialJoinAccumulator";
}
}

static class FollowerJoinAccumulator implements JoinAccumulator {
@Override
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
Expand Down Expand Up @@ -265,13 +277,14 @@ public void close(Mode newMode) {
});
masterService.submitStateUpdateTasks(stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT),
joinTaskExecutor);
} else if (newMode == Mode.FOLLOWER) {
} else {
assert newMode == Mode.FOLLOWER : newMode;
joinRequestAccumulator.values().forEach(joinCallback -> joinCallback.onFailure(
new CoordinationStateRejectedException("became follower")));
} else {
assert newMode == Mode.CANDIDATE;
assert joinRequestAccumulator.isEmpty() : joinRequestAccumulator.keySet();
}

// CandidateJoinAccumulator is only closed when becoming leader or follower, otherwise it accumulates all joins received
// regardless of term.
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public void testCanUpdateClusterStateAfterStabilisation() {

final ClusterNode leader = cluster.getAnyLeader();
long finalValue = randomLong();

logger.info("--> submitting value [{}] to [{}]", finalValue, leader);
leader.submitValue(finalValue);
cluster.stabilise(); // TODO this should only need a short stabilisation

Expand All @@ -96,6 +98,7 @@ class Cluster {

final List<ClusterNode> clusterNodes;
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
// TODO does ThreadPool need a node name any more?
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build());
private final VotingConfiguration initialConfiguration;

Expand Down Expand Up @@ -267,17 +270,27 @@ boolean isLeader() {
}

void submitValue(final long value) {
masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return setValue(currentState, value);
}
onNode(localNode, new Runnable() {
@Override
public void run() {
masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return setValue(currentState, value);
}

@Override
public void onFailure(String source, Exception e) {
logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e);
}
});
}

@Override
public void onFailure(String source, Exception e) {
logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e);
public String toString() {
return "submitStateUpdateTask: new value [" + value + "]";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this toString() here? We run it right away and don't log it anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left over from earlier when I was enqueueing it, good point.

}
});
}).run();
}
}

Expand Down