From 9b1a6b21c0ecf34d9a14b98d89b8aa7cca2bff58 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Sat, 1 Dec 2018 11:11:55 +0100 Subject: [PATCH 1/4] Add join validation --- .../cluster/coordination/Coordinator.java | 50 ++++++++- .../cluster/coordination/JoinHelper.java | 43 ++++++- .../coordination/JoinTaskExecutor.java | 15 +++ .../PublicationTransportHandler.java | 28 +++-- .../discovery/DiscoveryModule.java | 2 +- .../discovery/zen/MembershipAction.java | 6 +- .../discovery/zen/ZenDiscovery.java | 14 +-- .../TransportBootstrapClusterActionTests.java | 3 +- ...ransportGetDiscoveredNodesActionTests.java | 2 +- .../coordination/CoordinatorTests.java | 106 +++++++++++++++--- .../cluster/coordination/JoinHelperTests.java | 3 +- .../cluster/coordination/NodeJoinTests.java | 1 + .../discovery/zen/ZenDiscoveryUnitTests.java | 3 +- .../test/discovery/TestZenDiscovery.java | 2 +- 14 files changed, 230 insertions(+), 48 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 5f131f83b270f..f5a3185f1600d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -69,12 +69,14 @@ import org.elasticsearch.transport.TransportService; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -117,6 +119,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final LeaderChecker leaderChecker; private final FollowersChecker followersChecker; private final ClusterApplier clusterApplier; + private final Collection> onJoinValidators; @Nullable private Releasable electionScheduler; @Nullable @@ -139,13 +142,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService, Supplier persistedStateSupplier, UnicastHostsProvider unicastHostsProvider, - ClusterApplier clusterApplier, Random random) { + ClusterApplier clusterApplier, Collection> onJoinValidators, Random random) { super(settings); this.settings = settings; this.transportService = transportService; this.masterService = masterService; + this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, - this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm); + this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators); this.persistedStateSupplier = persistedStateSupplier; this.discoverySettings = new DiscoverySettings(settings, clusterSettings); this.lastKnownLeader = Optional.empty(); @@ -277,6 +281,11 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { + lastKnownLeader + ", rejecting"); } + if (publishRequest.getAcceptedState().term() > coordinationState.get().getLastAcceptedState().term()) { + // only do join validation if we have not accepted state from this master yet + onJoinValidators.stream().forEach(a -> a.accept(getLocalNode(), publishRequest.getAcceptedState())); + } + ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term()); final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest); @@ -389,6 +398,41 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback logger.trace("handleJoinRequest: as {}, handling {}", mode, joinRequest); transportService.connectToNode(joinRequest.getSourceNode()); + final ClusterState stateForJoinValidation = getStateForMasterService(); + + if (stateForJoinValidation.nodes().isLocalNodeElectedMaster()) { + // we do this in a couple of places including the cluster update thread. This one here is really just best effort + // to ensure we fail as fast as possible. + onJoinValidators.stream().forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation)); + if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { + JoinTaskExecutor.ensureMajorVersionBarrier(joinRequest.getSourceNode().getVersion(), + stateForJoinValidation.getNodes().getMinNodeVersion()); + } + + // validate the join on the joining node, will throw a failure if it fails the validation + joinHelper.sendValidateJoinRequest(joinRequest.getSourceNode(), stateForJoinValidation, new ActionListener() { + @Override + public void onResponse(Empty empty) { + try { + processJoinRequest(joinRequest, joinCallback); + } catch (Exception e) { + joinCallback.onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", + joinRequest.getSourceNode()), e); + joinCallback.onFailure(new IllegalStateException("failure when sending a validation request to node", e)); + } + }); + } else { + processJoinRequest(joinRequest, joinCallback); + } + } + + private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) { final Optional optionalJoin = joinRequest.getOptionalJoin(); synchronized (mutex) { final CoordinationState coordState = coordinationState.get(); @@ -514,7 +558,7 @@ Mode getMode() { } // visible for testing - public DiscoveryNode getLocalNode() { + DiscoveryNode getLocalNode() { return transportService.getLocalNode(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 0154919de74ff..d1a99b4edd0f5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskListener; @@ -40,15 +41,18 @@ import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -64,6 +68,7 @@ public class JoinHelper { private static final Logger logger = LogManager.getLogger(JoinHelper.class); public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join"; + public static final String VALIDATE_JOIN_ACTION_NAME = "internal:cluster/coordination/join/validate"; public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join"; // the timeout for each join attempt @@ -80,7 +85,8 @@ public class JoinHelper { public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, TransportService transportService, LongSupplier currentTermSupplier, - BiConsumer joinHandler, Function joinLeaderInTerm) { + BiConsumer joinHandler, Function joinLeaderInTerm, + Collection> joinValidators) { this.masterService = masterService; this.transportService = transportService; this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); @@ -123,9 +129,19 @@ public ClusterTasksResult execute(ClusterState currentSta channel.sendResponse(Empty.INSTANCE); }); + transportService.registerRequestHandler(VALIDATE_JOIN_ACTION_NAME, + () -> new MembershipAction.ValidateJoinRequest(), ThreadPool.Names.GENERIC, + (request, channel, task) -> { + joinValidators.stream().forEach(action -> action.accept(transportService.getLocalNode(), request.getState())); + channel.sendResponse(Empty.INSTANCE); + }); + transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_VALIDATE_ACTION_NAME, () -> new MembershipAction.ValidateJoinRequest(), ThreadPool.Names.GENERIC, - (request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: implement join validation + (request, channel, task) -> { + joinValidators.stream().forEach(action -> action.accept(transportService.getLocalNode(), request.getState())); + channel.sendResponse(Empty.INSTANCE); + }); transportService.registerRequestHandler( ZenDiscovery.DISCOVERY_REJOIN_ACTION_NAME, ZenDiscovery.RejoinClusterRequest::new, ThreadPool.Names.SAME, @@ -244,6 +260,29 @@ public String executor() { }); } + public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener listener) { + final String actionName; + if (Coordinator.isZen1Node(node)) { + actionName = MembershipAction.DISCOVERY_JOIN_VALIDATE_ACTION_NAME; + } else { + actionName = VALIDATE_JOIN_ACTION_NAME; + } + transportService.sendRequest(node, actionName, + new MembershipAction.ValidateJoinRequest(state), + TransportRequestOptions.builder().withTimeout(joinTimeout).build(), + new EmptyTransportResponseHandler(ThreadPool.Names.GENERIC) { + @Override + public void handleResponse(TransportResponse.Empty response) { + listener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + listener.onFailure(exp); + } + }); + } + public interface JoinCallback { void onSuccess(); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index 9544cf15a0c4e..c4c76d8a8fe74 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -31,7 +31,11 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.discovery.DiscoverySettings; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.function.BiConsumer; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -259,4 +263,15 @@ public static void ensureMajorVersionBarrier(Version joiningNodeVersion, Version "All nodes in the cluster are of a higher major [" + clusterMajor + "]."); } } + + public static Collection> addBuiltInJoinValidators( + Collection> onJoinValidators) { + final Collection> validators = new ArrayList<>(); + validators.add((node, state) -> { + ensureNodesCompatibility(node.getVersion(), state.getNodes()); + ensureIndexCompatibility(node.getVersion(), state.getMetaData()); + }); + validators.addAll(onJoinValidators); + return Collections.unmodifiableCollection(validators); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java index 87ce488345db0..b0b91cd0980f2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java @@ -389,7 +389,13 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque in.setVersion(request.version()); // If true we received full cluster state - otherwise diffs if (in.readBoolean()) { - final ClusterState incomingState = ClusterState.readFrom(in, transportService.getLocalNode()); + final ClusterState incomingState; + try { + incomingState = ClusterState.readFrom(in, transportService.getLocalNode()); + } catch (Exception e){ + logger.warn("unexpected error while deserializing an incoming cluster state", e); + throw e; + } fullClusterStateReceivedCount.incrementAndGet(); logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); @@ -400,10 +406,20 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque final ClusterState lastSeen = lastSeenClusterState.get(); if (lastSeen == null) { logger.debug("received diff for but don't have any local cluster state - requesting full state"); + incompatibleClusterStateDiffReceivedCount.incrementAndGet(); throw new IncompatibleClusterStateVersionException("have no local cluster state"); } else { - Diff diff = ClusterState.readDiffFrom(in, lastSeen.nodes().getLocalNode()); - final ClusterState incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException + final ClusterState incomingState; + try { + Diff diff = ClusterState.readDiffFrom(in, lastSeen.nodes().getLocalNode()); + incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException + } catch (IncompatibleClusterStateVersionException e) { + incompatibleClusterStateDiffReceivedCount.incrementAndGet(); + throw e; + } catch (Exception e){ + logger.warn("unexpected error while deserializing an incoming cluster state", e); + throw e; + } compatibleClusterStateDiffReceivedCount.incrementAndGet(); logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", incomingState.version(), incomingState.stateUUID(), request.bytes().length()); @@ -412,12 +428,6 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque return response; } } - } catch (IncompatibleClusterStateVersionException e) { - incompatibleClusterStateDiffReceivedCount.incrementAndGet(); - throw e; - } catch (Exception e) { - logger.warn("unexpected error while deserializing an incoming cluster state", e); - throw e; } finally { IOUtils.close(in); } diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index 1572548b1b1fc..aa7d62db3e1df 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -131,7 +131,7 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic discoveryTypes.put(ZEN2_DISCOVERY_TYPE, () -> new Coordinator(NODE_NAME_SETTING.get(settings), settings, clusterSettings, transportService, namedWriteableRegistry, allocationService, masterService, () -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), hostsProvider, clusterApplier, - Randomness.get())); + joinValidators, Randomness.get())); discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier, gatewayMetaState)); for (DiscoveryPlugin plugin : plugins) { diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java b/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java index a136d28305252..550b25083fb96 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java @@ -164,7 +164,7 @@ public static class ValidateJoinRequest extends TransportRequest { public ValidateJoinRequest() {} - ValidateJoinRequest(ClusterState state) { + public ValidateJoinRequest(ClusterState state) { this.state = state; } @@ -179,6 +179,10 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); this.state.writeTo(out); } + + public ClusterState getState() { + return state; + } } static class ValidateJoinRequestRequestHandler implements TransportRequestHandler { diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 0657f9e80278a..05d0bfa27188a 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -73,7 +73,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Set; @@ -163,7 +162,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService, Collection> onJoinValidators, GatewayMetaState gatewayMetaState) { super(settings); - this.onJoinValidators = addBuiltInJoinValidators(onJoinValidators); + this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); this.masterService = masterService; this.clusterApplier = clusterApplier; this.transportService = transportService; @@ -235,17 +234,6 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t } } - static Collection> addBuiltInJoinValidators( - Collection> onJoinValidators) { - Collection> validators = new ArrayList<>(); - validators.add((node, state) -> { - JoinTaskExecutor.ensureNodesCompatibility(node.getVersion(), state.getNodes()); - JoinTaskExecutor.ensureIndexCompatibility(node.getVersion(), state.getMetaData()); - }); - validators.addAll(onJoinValidators); - return Collections.unmodifiableCollection(validators); - } - // protected to allow overriding in tests protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, UnicastHostsProvider hostsProvider) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java index cf814482f6da0..31486a52bd08f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java @@ -47,6 +47,7 @@ import org.junit.BeforeClass; import java.io.IOException; +import java.util.Collections; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -96,7 +97,7 @@ public void setupTest() { ESAllocationTestCase.createAllocationService(Settings.EMPTY), new MasterService("local", Settings.EMPTY, threadPool), () -> new InMemoryPersistedState(0, ClusterState.builder(new ClusterName("cluster")).build()), r -> emptyList(), - new NoOpClusterApplier(), new Random(random().nextLong())); + new NoOpClusterApplier(), Collections.emptyList(), new Random(random().nextLong())); } public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java index add52a1eedc96..498cc157b720e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java @@ -132,7 +132,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req ESAllocationTestCase.createAllocationService(settings), new MasterService("local", settings, threadPool), () -> new InMemoryPersistedState(0, ClusterState.builder(new ClusterName(clusterName)).build()), r -> emptyList(), - new NoOpClusterApplier(), new Random(random().nextLong())); + new NoOpClusterApplier(), Collections.emptyList(), new Random(random().nextLong())); } public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 17bf36b411b52..7ed68073b6979 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -62,6 +62,7 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -71,6 +72,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -954,6 +956,67 @@ private void testAppliesNoMasterBlock(String noMasterBlockSetting, ClusterBlock // TODO reboot the leader and verify that the same block is applied when it restarts } + public void testNodeCantJoinIfJoinValidationFailsOnMaster() { + final Cluster cluster = new Cluster(randomIntBetween(1, 3)); + cluster.runRandomly(); + cluster.stabilise(); + + // check that if node join validation fails on master, the nodes can't join + List addedNodes = cluster.addNodes(randomIntBetween(1, 2)); + final Set validatedNodes = new HashSet<>(); + cluster.getAnyLeader().extraJoinValidators.add((discoveryNode, clusterState) -> { + validatedNodes.add(discoveryNode); + throw new IllegalArgumentException("join validation failed"); + }); + final long previousClusterStateVersion = cluster.getAnyLeader().getLastAppliedClusterState().version(); + cluster.runFor(10000, "failing join validation"); + assertEquals(validatedNodes, addedNodes.stream().map(ClusterNode::getLocalNode).collect(Collectors.toSet())); + assertTrue(addedNodes.stream().allMatch(ClusterNode::isCandidate)); + final long newClusterStateVersion = cluster.getAnyLeader().getLastAppliedClusterState().version(); + assertEquals(previousClusterStateVersion, newClusterStateVersion); + + cluster.getAnyLeader().extraJoinValidators.clear(); + cluster.stabilise(); + } + + public void testNodeCantJoinIfJoinValidationFailsOnJoiningNode() { + final Cluster cluster = new Cluster(randomIntBetween(1, 3)); + cluster.runRandomly(); + cluster.stabilise(); + + // check that if node join validation fails on joining node, the nodes can't join + List addedNodes = cluster.addNodes(randomIntBetween(1, 2)); + final Set validatedNodes = new HashSet<>(); + addedNodes.stream().forEach(cn -> cn.extraJoinValidators.add((discoveryNode, clusterState) -> { + validatedNodes.add(discoveryNode); + throw new IllegalArgumentException("join validation failed"); + })); + final long previousClusterStateVersion = cluster.getAnyLeader().getLastAppliedClusterState().version(); + cluster.runFor(10000, "failing join validation"); + assertEquals(validatedNodes, addedNodes.stream().map(ClusterNode::getLocalNode).collect(Collectors.toSet())); + assertTrue(addedNodes.stream().allMatch(ClusterNode::isCandidate)); + final long newClusterStateVersion = cluster.getAnyLeader().getLastAppliedClusterState().version(); + assertEquals(previousClusterStateVersion, newClusterStateVersion); + + addedNodes.stream().forEach(cn -> cn.extraJoinValidators.clear()); + cluster.stabilise(); + } + + public void testClusterCantFormWithFailingJoinValidation() { + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + // fail join validation on a majority of nodes in the initial configuration + randomValueOtherThanMany(nodes -> + cluster.initialConfiguration.hasQuorum( + nodes.stream().map(ClusterNode::getLocalNode).map(DiscoveryNode::getId).collect(Collectors.toSet())) == false, + () -> randomSubsetOf(cluster.clusterNodes)) + .forEach(cn -> cn.extraJoinValidators.add((discoveryNode, clusterState) -> { + throw new IllegalArgumentException("join validation failed"); + })); + cluster.bootstrapIfNecessary(); + cluster.runFor(10000, "failing join validation"); + assertTrue(cluster.clusterNodes.stream().allMatch(cn -> cn.getLastAppliedClusterState().version() == 0)); + } + private static long defaultMillis(Setting setting) { return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY; } @@ -1041,8 +1104,8 @@ class Cluster { initialNodeCount, masterEligibleNodeIds, initialConfiguration); } - void addNodesAndStabilise(int newNodesCount) { - addNodes(newNodesCount); + List addNodesAndStabilise(int newNodesCount) { + final List addedNodes = addNodes(newNodesCount); stabilise( // The first pinging discovers the master defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) @@ -1052,16 +1115,20 @@ void addNodesAndStabilise(int newNodesCount) { // followup reconfiguration + newNodesCount * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY); // TODO Investigate whether 4 publications is sufficient due to batching? A bound linear in the number of nodes isn't great. + return addedNodes; } - void addNodes(int newNodesCount) { + List addNodes(int newNodesCount) { logger.info("--> adding {} nodes", newNodesCount); final int nodeSizeAtStart = clusterNodes.size(); + final List addedNodes = new ArrayList<>(); for (int i = 0; i < newNodesCount; i++) { final ClusterNode clusterNode = new ClusterNode(nodeSizeAtStart + i, true); - clusterNodes.add(clusterNode); + addedNodes.add(clusterNode); } + clusterNodes.addAll(addedNodes); + return addedNodes; } int size() { @@ -1199,15 +1266,7 @@ void stabilise(long stabilisationDurationMillis) { deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY)); assertFalse("stabilisation requires stable storage", disruptStorage); - if (clusterNodes.stream().allMatch(ClusterNode::isNotUsefullyBootstrapped)) { - assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); - assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); - runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); - final ClusterNode bootstrapNode = getAnyMasterEligibleNode(); - bootstrapNode.applyInitialConfiguration(); - } else { - logger.info("setting initial configuration not required"); - } + bootstrapIfNecessary(); runFor(stabilisationDurationMillis, "stabilising"); @@ -1273,6 +1332,18 @@ void stabilise(long stabilisationDurationMillis) { leader.improveConfiguration(lastAcceptedState), sameInstance(lastAcceptedState)); } + void bootstrapIfNecessary() { + if (clusterNodes.stream().allMatch(ClusterNode::isNotUsefullyBootstrapped)) { + assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); + assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); + runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); + final ClusterNode bootstrapNode = getAnyMasterEligibleNode(); + bootstrapNode.applyInitialConfiguration(); + } else { + logger.info("setting initial configuration not required"); + } + } + void runFor(long runDurationMillis, String description) { final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + runDurationMillis; logger.info("--> runFor({}ms) running until [{}ms]: {}", runDurationMillis, endTime, description); @@ -1401,6 +1472,7 @@ class ClusterNode { private AckedFakeThreadPoolMasterService masterService; private TransportService transportService; private DisruptableMockTransport mockTransport; + private List> extraJoinValidators = new ArrayList<>(); private ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED; ClusterNode(int nodeIndex, boolean masterEligible) { @@ -1475,9 +1547,11 @@ protected void onBlackholedDuringSend(long requestId, String action, DiscoveryNo transportService = mockTransport.createTransportService( settings, deterministicTaskQueue.getThreadPool(runnable -> onNode(localNode, runnable)), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet()); + final Collection> onJoinValidators = + Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs))); coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(), ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState, - Cluster.this::provideUnicastHosts, clusterApplier, Randomness.get()); + Cluster.this::provideUnicastHosts, clusterApplier, onJoinValidators, Randomness.get()); masterService.setClusterStatePublisher(coordinator); transportService.start(); @@ -1503,6 +1577,10 @@ boolean isLeader() { return coordinator.getMode() == LEADER; } + boolean isCandidate() { + return coordinator.getMode() == CANDIDATE; + } + ClusterState improveConfiguration(ClusterState currentState) { synchronized (coordinator.mutex) { return coordinator.improveConfiguration(currentState); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index 6e7965f896f4c..ef843717fb469 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -44,7 +44,8 @@ public void testJoinDeduplication() { deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> localNode, null, Collections.emptySet()); JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, - (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); }); + (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); }, + Collections.emptyList()); transportService.start(); DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 214174d2a52fa..1068af67c9880 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -173,6 +173,7 @@ transportService, writableRegistry(), masterService, () -> new InMemoryPersistedState(term, initialState), r -> emptyList(), new NoOpClusterApplier(), + Collections.emptyList(), random); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index 99155799fb199..c9a2f7dc58388 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.coordination.JoinTaskExecutor; import org.elasticsearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -388,7 +389,7 @@ public void testValidateOnUnsupportedIndexVersionCreated() throws Exception { final DiscoveryNode localNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT); MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler - (() -> localNode, ZenDiscovery.addBuiltInJoinValidators(Collections.emptyList())); + (() -> localNode, JoinTaskExecutor.addBuiltInJoinValidators(Collections.emptyList())); final boolean incompatible = randomBoolean(); IndexMetaData indexMetaData = IndexMetaData.builder("test").settings(Settings.builder() .put(SETTING_VERSION_CREATED, diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java index 783dc6325c4a2..53be34c0b40c8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java @@ -81,7 +81,7 @@ public Map> getDiscoveryTypes(ThreadPool threadPool, return new Coordinator("test_node", fixedSettings, clusterSettings, transportService, namedWriteableRegistry, allocationService, masterService, () -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), hostsProvider, - clusterApplier, new Random(Randomness.get().nextLong())); + clusterApplier, Collections.emptyList(), new Random(Randomness.get().nextLong())); } else { return new TestZenDiscovery(fixedSettings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings, hostsProvider, allocationService, gatewayMetaState); From b8babfb18b0cbaaa1818f21f5df278a3ad56cfbd Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 7 Jan 2019 21:31:04 +0100 Subject: [PATCH 2/4] fix test --- .../org/elasticsearch/cluster/coordination/NodeJoinTests.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 1068af67c9880..108b9e6dd7f1e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -157,6 +157,8 @@ protected void onSendRequest(long requestId, String action, TransportRequest req if (action.equals(HANDSHAKE_ACTION_NAME)) { handleResponse(requestId, new TransportService.HandshakeResponse(destination, initialState.getClusterName(), destination.getVersion())); + } else if (action.equals(JoinHelper.VALIDATE_JOIN_ACTION_NAME)) { + handleResponse(requestId, new TransportResponse.Empty()); } else { super.onSendRequest(requestId, action, request, destination); } From 6404e8f604d98dee274c82f1e48cba0b5b3fe5d9 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 9 Jan 2019 12:33:02 +0100 Subject: [PATCH 3/4] minor stuff --- .../elasticsearch/cluster/coordination/Coordinator.java | 4 ++-- .../elasticsearch/cluster/coordination/JoinHelper.java | 8 ++++---- .../java/org/elasticsearch/discovery/DiscoveryModule.java | 2 +- .../cluster/coordination/CoordinatorTests.java | 6 +++--- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index f5a3185f1600d..107546b6be94e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -283,7 +283,7 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { if (publishRequest.getAcceptedState().term() > coordinationState.get().getLastAcceptedState().term()) { // only do join validation if we have not accepted state from this master yet - onJoinValidators.stream().forEach(a -> a.accept(getLocalNode(), publishRequest.getAcceptedState())); + onJoinValidators.forEach(a -> a.accept(getLocalNode(), publishRequest.getAcceptedState())); } ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term()); @@ -403,7 +403,7 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback if (stateForJoinValidation.nodes().isLocalNodeElectedMaster()) { // we do this in a couple of places including the cluster update thread. This one here is really just best effort // to ensure we fail as fast as possible. - onJoinValidators.stream().forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation)); + onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation)); if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { JoinTaskExecutor.ensureMajorVersionBarrier(joinRequest.getSourceNode().getVersion(), stateForJoinValidation.getNodes().getMinNodeVersion()); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index d1a99b4edd0f5..8c41d7b2eaa52 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -130,16 +130,16 @@ public ClusterTasksResult execute(ClusterState currentSta }); transportService.registerRequestHandler(VALIDATE_JOIN_ACTION_NAME, - () -> new MembershipAction.ValidateJoinRequest(), ThreadPool.Names.GENERIC, + MembershipAction.ValidateJoinRequest::new, ThreadPool.Names.GENERIC, (request, channel, task) -> { - joinValidators.stream().forEach(action -> action.accept(transportService.getLocalNode(), request.getState())); + joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState())); channel.sendResponse(Empty.INSTANCE); }); transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_VALIDATE_ACTION_NAME, - () -> new MembershipAction.ValidateJoinRequest(), ThreadPool.Names.GENERIC, + MembershipAction.ValidateJoinRequest::new, ThreadPool.Names.GENERIC, (request, channel, task) -> { - joinValidators.stream().forEach(action -> action.accept(transportService.getLocalNode(), request.getState())); + joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState())); channel.sendResponse(Empty.INSTANCE); }); diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index aa7d62db3e1df..042eb9daa0d9d 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -127,7 +127,7 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic Map> discoveryTypes = new HashMap<>(); discoveryTypes.put(ZEN_DISCOVERY_TYPE, () -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, - clusterSettings, hostsProvider, allocationService, Collections.unmodifiableCollection(joinValidators), gatewayMetaState)); + clusterSettings, hostsProvider, allocationService, joinValidators, gatewayMetaState)); discoveryTypes.put(ZEN2_DISCOVERY_TYPE, () -> new Coordinator(NODE_NAME_SETTING.get(settings), settings, clusterSettings, transportService, namedWriteableRegistry, allocationService, masterService, () -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), hostsProvider, clusterApplier, diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 7ed68073b6979..794d2a8456d59 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -956,7 +956,7 @@ private void testAppliesNoMasterBlock(String noMasterBlockSetting, ClusterBlock // TODO reboot the leader and verify that the same block is applied when it restarts } - public void testNodeCantJoinIfJoinValidationFailsOnMaster() { + public void testNodeCannotJoinIfJoinValidationFailsOnMaster() { final Cluster cluster = new Cluster(randomIntBetween(1, 3)); cluster.runRandomly(); cluster.stabilise(); @@ -979,7 +979,7 @@ public void testNodeCantJoinIfJoinValidationFailsOnMaster() { cluster.stabilise(); } - public void testNodeCantJoinIfJoinValidationFailsOnJoiningNode() { + public void testNodeCannotJoinIfJoinValidationFailsOnJoiningNode() { final Cluster cluster = new Cluster(randomIntBetween(1, 3)); cluster.runRandomly(); cluster.stabilise(); @@ -1002,7 +1002,7 @@ public void testNodeCantJoinIfJoinValidationFailsOnJoiningNode() { cluster.stabilise(); } - public void testClusterCantFormWithFailingJoinValidation() { + public void testClusterCannotFormWithFailingJoinValidation() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); // fail join validation on a majority of nodes in the initial configuration randomValueOtherThanMany(nodes -> From b0abb8ed195dcdccbb35377df85d3b7eec3775d1 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 9 Jan 2019 12:35:42 +0100 Subject: [PATCH 4/4] move comment --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 107546b6be94e..4dd6df9df4a7f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -401,10 +401,10 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback final ClusterState stateForJoinValidation = getStateForMasterService(); if (stateForJoinValidation.nodes().isLocalNodeElectedMaster()) { - // we do this in a couple of places including the cluster update thread. This one here is really just best effort - // to ensure we fail as fast as possible. onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation)); if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { + // we do this in a couple of places including the cluster update thread. This one here is really just best effort + // to ensure we fail as fast as possible. JoinTaskExecutor.ensureMajorVersionBarrier(joinRequest.getSourceNode().getVersion(), stateForJoinValidation.getNodes().getMinNodeVersion()); }