-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Zen2: Add join validation #37203
Zen2: Add join validation #37203
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,12 +69,14 @@ | |
import org.elasticsearch.transport.TransportService; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.Random; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.StreamSupport; | ||
|
@@ -117,6 +119,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery | |
private final LeaderChecker leaderChecker; | ||
private final FollowersChecker followersChecker; | ||
private final ClusterApplier clusterApplier; | ||
private final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators; | ||
@Nullable | ||
private Releasable electionScheduler; | ||
@Nullable | ||
|
@@ -139,13 +142,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery | |
public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService, | ||
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService, | ||
Supplier<CoordinationState.PersistedState> persistedStateSupplier, UnicastHostsProvider unicastHostsProvider, | ||
ClusterApplier clusterApplier, Random random) { | ||
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random) { | ||
super(settings); | ||
this.settings = settings; | ||
this.transportService = transportService; | ||
this.masterService = masterService; | ||
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); | ||
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, | ||
this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm); | ||
this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators); | ||
this.persistedStateSupplier = persistedStateSupplier; | ||
this.discoverySettings = new DiscoverySettings(settings, clusterSettings); | ||
this.lastKnownLeader = Optional.empty(); | ||
|
@@ -277,6 +281,11 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { | |
+ lastKnownLeader + ", rejecting"); | ||
} | ||
|
||
if (publishRequest.getAcceptedState().term() > coordinationState.get().getLastAcceptedState().term()) { | ||
// only do join validation if we have not accepted state from this master yet | ||
onJoinValidators.stream().forEach(a -> a.accept(getLocalNode(), publishRequest.getAcceptedState())); | ||
} | ||
|
||
ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term()); | ||
final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest); | ||
|
||
|
@@ -389,6 +398,41 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback | |
logger.trace("handleJoinRequest: as {}, handling {}", mode, joinRequest); | ||
transportService.connectToNode(joinRequest.getSourceNode()); | ||
|
||
final ClusterState stateForJoinValidation = getStateForMasterService(); | ||
|
||
if (stateForJoinValidation.nodes().isLocalNodeElectedMaster()) { | ||
// we do this in a couple of places including the cluster update thread. This one here is really just best effort | ||
// to ensure we fail as fast as possible. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this true? I see that we call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. copied this from ZenDiscovery. It's obviously wrong. I've moved the comment to the call of |
||
onJoinValidators.stream().forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation)); | ||
ywelsch marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { | ||
JoinTaskExecutor.ensureMajorVersionBarrier(joinRequest.getSourceNode().getVersion(), | ||
stateForJoinValidation.getNodes().getMinNodeVersion()); | ||
} | ||
|
||
// validate the join on the joining node, will throw a failure if it fails the validation | ||
joinHelper.sendValidateJoinRequest(joinRequest.getSourceNode(), stateForJoinValidation, new ActionListener<Empty>() { | ||
@Override | ||
public void onResponse(Empty empty) { | ||
try { | ||
processJoinRequest(joinRequest, joinCallback); | ||
} catch (Exception e) { | ||
joinCallback.onFailure(e); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", | ||
joinRequest.getSourceNode()), e); | ||
joinCallback.onFailure(new IllegalStateException("failure when sending a validation request to node", e)); | ||
} | ||
}); | ||
} else { | ||
processJoinRequest(joinRequest, joinCallback); | ||
} | ||
} | ||
|
||
private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) { | ||
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin(); | ||
synchronized (mutex) { | ||
final CoordinationState coordState = coordinationState.get(); | ||
|
@@ -514,7 +558,7 @@ Mode getMode() { | |
} | ||
|
||
// visible for testing | ||
public DiscoveryNode getLocalNode() { | ||
DiscoveryNode getLocalNode() { | ||
return transportService.getLocalNode(); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -131,7 +131,7 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic | |
discoveryTypes.put(ZEN2_DISCOVERY_TYPE, () -> new Coordinator(NODE_NAME_SETTING.get(settings), settings, clusterSettings, | ||
transportService, namedWriteableRegistry, allocationService, masterService, | ||
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), hostsProvider, clusterApplier, | ||
Randomness.get())); | ||
joinValidators, Randomness.get())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We say There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's not necessary because it's wrapped again with |
||
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier, | ||
gatewayMetaState)); | ||
for (DiscoveryPlugin plugin : plugins) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently the
.stream()
is unnecessary.