From 4def6fc2aa992ee35a297e45ff50128a808672ca Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 16 Aug 2018 16:09:42 +0200 Subject: [PATCH 1/3] move JoinTaskExecutor and its dependencies out of ZenDiscovery --- .../coordination/JoinTaskExecutor.java | 234 ++++++++++++++++++ .../discovery/zen/MembershipAction.java | 60 ----- .../discovery/zen/NodeJoinController.java | 170 +------------ .../discovery/zen/ZenDiscovery.java | 7 +- .../discovery/zen/MembershipActionTests.java | 23 +- .../indices/cluster/ClusterStateChanges.java | 10 +- 6 files changed, 265 insertions(+), 239 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java new file mode 100644 index 0000000000000..ade841dd4d949 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -0,0 +1,234 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.NotMasterException; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.discovery.DiscoverySettings; + +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; + +public class JoinTaskExecutor implements ClusterStateTaskExecutor { + + private final AllocationService allocationService; + + private final Logger logger; + + public JoinTaskExecutor(AllocationService allocationService, Logger logger) { + this.allocationService = allocationService; + this.logger = logger; + } + + @Override + public ClusterTasksResult execute(ClusterState currentState, List joiningNodes) throws Exception { + final ClusterTasksResult.Builder results = ClusterTasksResult.builder(); + + final DiscoveryNodes currentNodes = currentState.nodes(); + boolean nodesChanged = false; + ClusterState.Builder newState; + + if (joiningNodes.size() == 1 && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) { + return results.successes(joiningNodes).build(currentState); + } else if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) { + assert joiningNodes.contains(FINISH_ELECTION_TASK) : "becoming a master but election is not finished " + joiningNodes; + // use these joins to try and become the master. + // Note that we don't have to do any validation of the amount of joining nodes - the commit + // during the cluster state publishing guarantees that we have enough + newState = becomeMasterAndTrimConflictingNodes(currentState, joiningNodes); + nodesChanged = true; + } else if (currentNodes.isLocalNodeElectedMaster() == false) { + logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode()); + throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request"); + } else { + newState = ClusterState.builder(currentState); + } + + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes()); + + assert nodesBuilder.isLocalNodeElectedMaster(); + + Version minClusterNodeVersion = newState.nodes().getMinNodeVersion(); + Version maxClusterNodeVersion = newState.nodes().getMaxNodeVersion(); + // we only enforce major version transitions on a fully formed clusters + final boolean enforceMajorVersion = currentState.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false; + // processing any joins + for (final DiscoveryNode node : joiningNodes) { + if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) { + // noop + } else if (currentNodes.nodeExists(node)) { + logger.debug("received a join request for an existing node [{}]", node); + } else { + try { + if (enforceMajorVersion) { + ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion); + } + ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion); + // we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices + // we have to reject nodes that don't support all indices we have in this cluster + ensureIndexCompatibility(node.getVersion(), currentState.getMetaData()); + nodesBuilder.add(node); + nodesChanged = true; + minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); + maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion()); + } catch (IllegalArgumentException | IllegalStateException e) { + results.failure(node, e); + continue; + } + } + results.success(node); + } + if (nodesChanged) { + newState.nodes(nodesBuilder); + return results.build(allocationService.reroute(newState.build(), "node_join")); + } else { + // we must return a new cluster state instance to force publishing. This is important + // for the joining node to finalize its join and set us as a master + return results.build(newState.build()); + } + } + + private ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState currentState, List joiningNodes) { + assert currentState.nodes().getMasterNodeId() == null : currentState; + DiscoveryNodes currentNodes = currentState.nodes(); + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes); + nodesBuilder.masterNodeId(currentState.nodes().getLocalNodeId()); + + for (final DiscoveryNode joiningNode : joiningNodes) { + final DiscoveryNode nodeWithSameId = nodesBuilder.get(joiningNode.getId()); + if (nodeWithSameId != null && nodeWithSameId.equals(joiningNode) == false) { + logger.debug("removing existing node [{}], which conflicts with incoming join from [{}]", nodeWithSameId, joiningNode); + nodesBuilder.remove(nodeWithSameId.getId()); + } + final DiscoveryNode nodeWithSameAddress = currentNodes.findByAddress(joiningNode.getAddress()); + if (nodeWithSameAddress != null && nodeWithSameAddress.equals(joiningNode) == false) { + logger.debug("removing existing node [{}], which conflicts with incoming join from [{}]", nodeWithSameAddress, + joiningNode); + nodesBuilder.remove(nodeWithSameAddress.getId()); + } + } + + + // now trim any left over dead nodes - either left there when the previous master stepped down + // or removed by us above + ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(ClusterBlocks.builder() + .blocks(currentState.blocks()) + .removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID)).build(); + return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false, + "removed dead nodes on election")); + } + + @Override + public boolean runOnlyOnMaster() { + // we validate that we are allowed to change the cluster state during cluster state processing + return false; + } + + /** + * a task indicated that the current node should become master, if no current master is known + */ + public static final DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_", + new TransportAddress(TransportAddress.META_ADDRESS, 0), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) { + @Override + public String toString() { + return ""; // this is not really task , so don't log anything about it... + } + }; + + /** + * a task that is used to signal the election is stopped and we should process pending joins. + * it may be use in combination with {@link JoinTaskExecutor#BECOME_MASTER_TASK} + */ + public static final DiscoveryNode FINISH_ELECTION_TASK = new DiscoveryNode("_FINISH_ELECTION_", + new TransportAddress(TransportAddress.META_ADDRESS, 0), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) { + @Override + public String toString() { + return ""; // this is not really task , so don't log anything about it... + } + }; + + /** + * Ensures that all indices are compatible with the given node version. This will ensure that all indices in the given metadata + * will not be created with a newer version of elasticsearch as well as that all indices are newer or equal to the minimum index + * compatibility version. + * @see Version#minimumIndexCompatibilityVersion() + * @throws IllegalStateException if any index is incompatible with the given version + */ + public static void ensureIndexCompatibility(final Version nodeVersion, MetaData metaData) { + Version supportedIndexVersion = nodeVersion.minimumIndexCompatibilityVersion(); + // we ensure that all indices in the cluster we join are compatible with us no matter if they are + // closed or not we can't read mappings of these indices so we need to reject the join... + for (IndexMetaData idxMetaData : metaData) { + if (idxMetaData.getCreationVersion().after(nodeVersion)) { + throw new IllegalStateException("index " + idxMetaData.getIndex() + " version not supported: " + + idxMetaData.getCreationVersion() + " the node version is: " + nodeVersion); + } + if (idxMetaData.getCreationVersion().before(supportedIndexVersion)) { + throw new IllegalStateException("index " + idxMetaData.getIndex() + " version not supported: " + + idxMetaData.getCreationVersion() + " minimum compatible index version is: " + supportedIndexVersion); + } + } + } + + /** ensures that the joining node has a version that's compatible with all current nodes*/ + public static void ensureNodesCompatibility(final Version joiningNodeVersion, DiscoveryNodes currentNodes) { + final Version minNodeVersion = currentNodes.getMinNodeVersion(); + final Version maxNodeVersion = currentNodes.getMaxNodeVersion(); + ensureNodesCompatibility(joiningNodeVersion, minNodeVersion, maxNodeVersion); + } + + /** ensures that the joining node has a version that's compatible with a given version range */ + public static void ensureNodesCompatibility(Version joiningNodeVersion, Version minClusterNodeVersion, Version maxClusterNodeVersion) { + assert minClusterNodeVersion.onOrBefore(maxClusterNodeVersion) : minClusterNodeVersion + " > " + maxClusterNodeVersion; + if (joiningNodeVersion.isCompatible(maxClusterNodeVersion) == false) { + throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " + + "The cluster contains nodes with version [" + maxClusterNodeVersion + "], which is incompatible."); + } + if (joiningNodeVersion.isCompatible(minClusterNodeVersion) == false) { + throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported." + + "The cluster contains nodes with version [" + minClusterNodeVersion + "], which is incompatible."); + } + } + + /** + * ensures that the joining node's major version is equal or higher to the minClusterNodeVersion. This is needed + * to ensure that if the master is already fully operating under the new major version, it doesn't go back to mixed + * version mode + **/ + public static void ensureMajorVersionBarrier(Version joiningNodeVersion, Version minClusterNodeVersion) { + final byte clusterMajor = minClusterNodeVersion.major; + if (joiningNodeVersion.major < clusterMajor) { + throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " + + "All nodes in the cluster are of a higher major [" + clusterMajor + "]."); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java b/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java index e8bafea66d3a4..e4e542876de4b 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java @@ -19,12 +19,8 @@ package org.elasticsearch.discovery.zen; -import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -199,62 +195,6 @@ public void messageReceived(ValidateJoinRequest request, TransportChannel channe } } - /** - * Ensures that all indices are compatible with the given node version. This will ensure that all indices in the given metadata - * will not be created with a newer version of elasticsearch as well as that all indices are newer or equal to the minimum index - * compatibility version. - * @see Version#minimumIndexCompatibilityVersion() - * @throws IllegalStateException if any index is incompatible with the given version - */ - static void ensureIndexCompatibility(final Version nodeVersion, MetaData metaData) { - Version supportedIndexVersion = nodeVersion.minimumIndexCompatibilityVersion(); - // we ensure that all indices in the cluster we join are compatible with us no matter if they are - // closed or not we can't read mappings of these indices so we need to reject the join... - for (IndexMetaData idxMetaData : metaData) { - if (idxMetaData.getCreationVersion().after(nodeVersion)) { - throw new IllegalStateException("index " + idxMetaData.getIndex() + " version not supported: " - + idxMetaData.getCreationVersion() + " the node version is: " + nodeVersion); - } - if (idxMetaData.getCreationVersion().before(supportedIndexVersion)) { - throw new IllegalStateException("index " + idxMetaData.getIndex() + " version not supported: " - + idxMetaData.getCreationVersion() + " minimum compatible index version is: " + supportedIndexVersion); - } - } - } - - /** ensures that the joining node has a version that's compatible with all current nodes*/ - static void ensureNodesCompatibility(final Version joiningNodeVersion, DiscoveryNodes currentNodes) { - final Version minNodeVersion = currentNodes.getMinNodeVersion(); - final Version maxNodeVersion = currentNodes.getMaxNodeVersion(); - ensureNodesCompatibility(joiningNodeVersion, minNodeVersion, maxNodeVersion); - } - - /** ensures that the joining node has a version that's compatible with a given version range */ - static void ensureNodesCompatibility(Version joiningNodeVersion, Version minClusterNodeVersion, Version maxClusterNodeVersion) { - assert minClusterNodeVersion.onOrBefore(maxClusterNodeVersion) : minClusterNodeVersion + " > " + maxClusterNodeVersion; - if (joiningNodeVersion.isCompatible(maxClusterNodeVersion) == false) { - throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " + - "The cluster contains nodes with version [" + maxClusterNodeVersion + "], which is incompatible."); - } - if (joiningNodeVersion.isCompatible(minClusterNodeVersion) == false) { - throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported." + - "The cluster contains nodes with version [" + minClusterNodeVersion + "], which is incompatible."); - } - } - - /** - * ensures that the joining node's major version is equal or higher to the minClusterNodeVersion. This is needed - * to ensure that if the master is already fully operating under the new major version, it doesn't go back to mixed - * version mode - **/ - static void ensureMajorVersionBarrier(Version joiningNodeVersion, Version minClusterNodeVersion) { - final byte clusterMajor = minClusterNodeVersion.major; - if (joiningNodeVersion.major < clusterMajor) { - throw new IllegalStateException("node version [" + joiningNodeVersion + "] is not supported. " + - "All nodes in the cluster are of a higher major [" + clusterMajor + "]."); - } - } - public static class LeaveRequest extends TransportRequest { private DiscoveryNode node; diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index 5cceba237e544..f6c25b1cdfef3 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -21,24 +21,19 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; -import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.NotMasterException; -import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.coordination.JoinTaskExecutor; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.DiscoverySettings; import java.util.ArrayList; import java.util.Collections; @@ -49,8 +44,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; - /** * This class processes incoming join request (passed zia {@link ZenDiscovery}). Incoming nodes * are directly added to the cluster state or are accumulated during master election. @@ -69,7 +62,12 @@ public NodeJoinController(MasterService masterService, AllocationService allocat Settings settings) { super(settings); this.masterService = masterService; - joinTaskExecutor = new JoinTaskExecutor(allocationService, electMaster, logger); + joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) { + @Override + public void clusterStatePublished(ClusterChangedEvent event) { + electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state()); + } + }; } /** @@ -276,8 +274,8 @@ assert isEnoughPendingJoins(getPendingMasterJoinsCount()) : "becoming a master b Map tasks = getPendingAsTasks(); final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)"; - tasks.put(BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result - tasks.put(FINISH_ELECTION_TASK, electionFinishedListener); + tasks.put(JoinTaskExecutor.BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result + tasks.put(JoinTaskExecutor.FINISH_ELECTION_TASK, electionFinishedListener); masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor); } @@ -285,7 +283,7 @@ public synchronized void closeAndProcessPending(String reason) { innerClose(); Map tasks = getPendingAsTasks(); final String source = "zen-disco-election-stop [" + reason + "]"; - tasks.put(FINISH_ELECTION_TASK, electionFinishedListener); + tasks.put(JoinTaskExecutor.FINISH_ELECTION_TASK, electionFinishedListener); masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor); } @@ -377,152 +375,4 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } } - /** - * a task indicated that the current node should become master, if no current master is known - */ - public static final DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_", - new TransportAddress(TransportAddress.META_ADDRESS, 0), - Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) { - @Override - public String toString() { - return ""; // this is not really task , so don't log anything about it... - } - }; - - /** - * a task that is used to signal the election is stopped and we should process pending joins. - * it may be use in combination with {@link #BECOME_MASTER_TASK} - */ - public static final DiscoveryNode FINISH_ELECTION_TASK = new DiscoveryNode("_FINISH_ELECTION_", - new TransportAddress(TransportAddress.META_ADDRESS, 0), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) { - @Override - public String toString() { - return ""; // this is not really task , so don't log anything about it... - } - }; - - // visible for testing - public static class JoinTaskExecutor implements ClusterStateTaskExecutor { - - private final AllocationService allocationService; - - private final ElectMasterService electMasterService; - - private final Logger logger; - - public JoinTaskExecutor(AllocationService allocationService, ElectMasterService electMasterService, Logger logger) { - this.allocationService = allocationService; - this.electMasterService = electMasterService; - this.logger = logger; - } - - @Override - public ClusterTasksResult execute(ClusterState currentState, List joiningNodes) throws Exception { - final ClusterTasksResult.Builder results = ClusterTasksResult.builder(); - - final DiscoveryNodes currentNodes = currentState.nodes(); - boolean nodesChanged = false; - ClusterState.Builder newState; - - if (joiningNodes.size() == 1 && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) { - return results.successes(joiningNodes).build(currentState); - } else if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) { - assert joiningNodes.contains(FINISH_ELECTION_TASK) : "becoming a master but election is not finished " + joiningNodes; - // use these joins to try and become the master. - // Note that we don't have to do any validation of the amount of joining nodes - the commit - // during the cluster state publishing guarantees that we have enough - newState = becomeMasterAndTrimConflictingNodes(currentState, joiningNodes); - nodesChanged = true; - } else if (currentNodes.isLocalNodeElectedMaster() == false) { - logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode()); - throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request"); - } else { - newState = ClusterState.builder(currentState); - } - - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes()); - - assert nodesBuilder.isLocalNodeElectedMaster(); - - Version minClusterNodeVersion = newState.nodes().getMinNodeVersion(); - Version maxClusterNodeVersion = newState.nodes().getMaxNodeVersion(); - // we only enforce major version transitions on a fully formed clusters - final boolean enforceMajorVersion = currentState.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false; - // processing any joins - for (final DiscoveryNode node : joiningNodes) { - if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) { - // noop - } else if (currentNodes.nodeExists(node)) { - logger.debug("received a join request for an existing node [{}]", node); - } else { - try { - if (enforceMajorVersion) { - MembershipAction.ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion); - } - MembershipAction.ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion); - // we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices - // we have to reject nodes that don't support all indices we have in this cluster - MembershipAction.ensureIndexCompatibility(node.getVersion(), currentState.getMetaData()); - nodesBuilder.add(node); - nodesChanged = true; - minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); - maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion()); - } catch (IllegalArgumentException | IllegalStateException e) { - results.failure(node, e); - continue; - } - } - results.success(node); - } - if (nodesChanged) { - newState.nodes(nodesBuilder); - return results.build(allocationService.reroute(newState.build(), "node_join")); - } else { - // we must return a new cluster state instance to force publishing. This is important - // for the joining node to finalize its join and set us as a master - return results.build(newState.build()); - } - } - - private ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState currentState, List joiningNodes) { - assert currentState.nodes().getMasterNodeId() == null : currentState; - DiscoveryNodes currentNodes = currentState.nodes(); - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes); - nodesBuilder.masterNodeId(currentState.nodes().getLocalNodeId()); - - for (final DiscoveryNode joiningNode : joiningNodes) { - final DiscoveryNode nodeWithSameId = nodesBuilder.get(joiningNode.getId()); - if (nodeWithSameId != null && nodeWithSameId.equals(joiningNode) == false) { - logger.debug("removing existing node [{}], which conflicts with incoming join from [{}]", nodeWithSameId, joiningNode); - nodesBuilder.remove(nodeWithSameId.getId()); - } - final DiscoveryNode nodeWithSameAddress = currentNodes.findByAddress(joiningNode.getAddress()); - if (nodeWithSameAddress != null && nodeWithSameAddress.equals(joiningNode) == false) { - logger.debug("removing existing node [{}], which conflicts with incoming join from [{}]", nodeWithSameAddress, - joiningNode); - nodesBuilder.remove(nodeWithSameAddress.getId()); - } - } - - - // now trim any left over dead nodes - either left there when the previous master stepped down - // or removed by us above - ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(ClusterBlocks.builder() - .blocks(currentState.blocks()) - .removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID)).build(); - return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false, - "removed dead nodes on election")); - } - - @Override - public boolean runOnlyOnMaster() { - // we validate that we are allowed to change the cluster state during cluster state processing - return false; - } - - @Override - public void clusterStatePublished(ClusterChangedEvent event) { - electMasterService.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state()); - } - } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 02415f8824d85..d042f0b35de4e 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.coordination.JoinTaskExecutor; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -231,8 +232,8 @@ static Collection> addBuiltInJoinValidato Collection> onJoinValidators) { Collection> validators = new ArrayList<>(); validators.add((node, state) -> { - MembershipAction.ensureNodesCompatibility(node.getVersion(), state.getNodes()); - MembershipAction.ensureIndexCompatibility(node.getVersion(), state.getMetaData()); + JoinTaskExecutor.ensureNodesCompatibility(node.getVersion(), state.getNodes()); + JoinTaskExecutor.ensureIndexCompatibility(node.getVersion(), state.getMetaData()); }); validators.addAll(onJoinValidators); return Collections.unmodifiableCollection(validators); @@ -858,7 +859,7 @@ void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final // to ensure we fail as fast as possible. onJoinValidators.stream().forEach(a -> a.accept(node, state)); if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { - MembershipAction.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion()); + JoinTaskExecutor.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion()); } // try and connect to the node, if it fails, we can raise an exception back to the client... transportService.connectToNode(node); diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/MembershipActionTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/MembershipActionTests.java index 2f4be2fcd5394..4e96add0c53b4 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/MembershipActionTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/MembershipActionTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.discovery.zen; import org.elasticsearch.Version; +import org.elasticsearch.cluster.coordination.JoinTaskExecutor; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -49,10 +50,10 @@ public void testPreventJoinClusterWithNewerIndices() { .numberOfReplicas(1).build(); metaBuilder.put(indexMetaData, false); MetaData metaData = metaBuilder.build(); - MembershipAction.ensureIndexCompatibility(Version.CURRENT, metaData); + JoinTaskExecutor.ensureIndexCompatibility(Version.CURRENT, metaData); expectThrows(IllegalStateException.class, () -> - MembershipAction.ensureIndexCompatibility(VersionUtils.getPreviousVersion(Version.CURRENT), + JoinTaskExecutor.ensureIndexCompatibility(VersionUtils.getPreviousVersion(Version.CURRENT), metaData)); } @@ -67,7 +68,7 @@ public void testPreventJoinClusterWithUnsupportedIndices() { metaBuilder.put(indexMetaData, false); MetaData metaData = metaBuilder.build(); expectThrows(IllegalStateException.class, () -> - MembershipAction.ensureIndexCompatibility(Version.CURRENT, + JoinTaskExecutor.ensureIndexCompatibility(Version.CURRENT, metaData)); } @@ -84,9 +85,9 @@ public void testPreventJoinClusterWithUnsupportedNodeVersions() { final Version tooLow = getPreviousVersion(maxNodeVersion.minimumCompatibilityVersion()); expectThrows(IllegalStateException.class, () -> { if (randomBoolean()) { - MembershipAction.ensureNodesCompatibility(tooLow, nodes); + JoinTaskExecutor.ensureNodesCompatibility(tooLow, nodes); } else { - MembershipAction.ensureNodesCompatibility(tooLow, minNodeVersion, maxNodeVersion); + JoinTaskExecutor.ensureNodesCompatibility(tooLow, minNodeVersion, maxNodeVersion); } }); } @@ -95,16 +96,16 @@ public void testPreventJoinClusterWithUnsupportedNodeVersions() { Version tooHigh = incompatibleFutureVersion(minNodeVersion); expectThrows(IllegalStateException.class, () -> { if (randomBoolean()) { - MembershipAction.ensureNodesCompatibility(tooHigh, nodes); + JoinTaskExecutor.ensureNodesCompatibility(tooHigh, nodes); } else { - MembershipAction.ensureNodesCompatibility(tooHigh, minNodeVersion, maxNodeVersion); + JoinTaskExecutor.ensureNodesCompatibility(tooHigh, minNodeVersion, maxNodeVersion); } }); } if (minNodeVersion.onOrAfter(Version.V_6_0_0_alpha1)) { Version oldMajor = randomFrom(allVersions().stream().filter(v -> v.major < 6).collect(Collectors.toList())); - expectThrows(IllegalStateException.class, () -> MembershipAction.ensureMajorVersionBarrier(oldMajor, minNodeVersion)); + expectThrows(IllegalStateException.class, () -> JoinTaskExecutor.ensureMajorVersionBarrier(oldMajor, minNodeVersion)); } final Version minGoodVersion = maxNodeVersion.major == minNodeVersion.major ? @@ -114,9 +115,9 @@ public void testPreventJoinClusterWithUnsupportedNodeVersions() { final Version justGood = randomVersionBetween(random(), minGoodVersion, maxCompatibleVersion(minNodeVersion)); if (randomBoolean()) { - MembershipAction.ensureNodesCompatibility(justGood, nodes); + JoinTaskExecutor.ensureNodesCompatibility(justGood, nodes); } else { - MembershipAction.ensureNodesCompatibility(justGood, minNodeVersion, maxNodeVersion); + JoinTaskExecutor.ensureNodesCompatibility(justGood, minNodeVersion, maxNodeVersion); } } @@ -136,7 +137,7 @@ public void testSuccess() { .numberOfReplicas(1).build(); metaBuilder.put(indexMetaData, false); MetaData metaData = metaBuilder.build(); - MembershipAction.ensureIndexCompatibility(Version.CURRENT, + JoinTaskExecutor.ensureIndexCompatibility(Version.CURRENT, metaData); } } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 1ea15321d5a0b..52afd5687e728 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -48,6 +48,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; +import org.elasticsearch.cluster.coordination.JoinTaskExecutor; import org.elasticsearch.cluster.metadata.AliasValidator; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -73,7 +74,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.discovery.zen.ElectMasterService; -import org.elasticsearch.discovery.zen.NodeJoinController; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; @@ -121,7 +121,7 @@ public class ClusterStateChanges extends AbstractComponent { private final TransportCreateIndexAction transportCreateIndexAction; private final ZenDiscovery.NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; - private final NodeJoinController.JoinTaskExecutor joinTaskExecutor; + private final JoinTaskExecutor joinTaskExecutor; public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool threadPool) { super(Settings.builder().put(PATH_HOME_SETTING.getKey(), "dummy").build()); @@ -201,7 +201,7 @@ allocationService, new AliasValidator(settings), environment, ElectMasterService electMasterService = new ElectMasterService(settings); nodeRemovalExecutor = new ZenDiscovery.NodeRemovalClusterStateTaskExecutor(allocationService, electMasterService, s -> { throw new AssertionError("rejoin not implemented"); }, logger); - joinTaskExecutor = new NodeJoinController.JoinTaskExecutor(allocationService, electMasterService, logger); + joinTaskExecutor = new JoinTaskExecutor(allocationService, logger); } public ClusterState createIndex(ClusterState state, CreateIndexRequest request) { @@ -234,8 +234,8 @@ public ClusterState addNodes(ClusterState clusterState, List node public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List nodes) { List joinNodes = new ArrayList<>(); - joinNodes.add(NodeJoinController.BECOME_MASTER_TASK); - joinNodes.add(NodeJoinController.FINISH_ELECTION_TASK); + joinNodes.add(JoinTaskExecutor.BECOME_MASTER_TASK); + joinNodes.add(JoinTaskExecutor.FINISH_ELECTION_TASK); joinNodes.addAll(nodes); return runTasks(joinTaskExecutor, clusterState, joinNodes); From 7826d1fd61b304aa2bb1780c86b10e7c0c861ac9 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 16 Aug 2018 16:27:31 +0200 Subject: [PATCH 2/3] Use proper identity for JoinController tasks --- .../coordination/JoinTaskExecutor.java | 93 +++++++++++-------- .../discovery/zen/NodeJoinController.java | 13 +-- .../indices/cluster/ClusterStateChanges.java | 8 +- 3 files changed, 66 insertions(+), 48 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index ade841dd4d949..6d2486da69542 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -29,34 +29,56 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.discovery.DiscoverySettings; -import java.util.Collections; import java.util.List; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; -public class JoinTaskExecutor implements ClusterStateTaskExecutor { +public class JoinTaskExecutor implements ClusterStateTaskExecutor { private final AllocationService allocationService; private final Logger logger; + public static class Task { + + private final DiscoveryNode node; + private final String reason; + + public Task(DiscoveryNode node, String reason) { + this.node = node; + this.reason = reason; + } + + public DiscoveryNode node() { + return node; + } + + public String reason() { + return reason; + } + + @Override + public String toString() { + return node != null ? node + " " + reason : reason; + } + } + public JoinTaskExecutor(AllocationService allocationService, Logger logger) { this.allocationService = allocationService; this.logger = logger; } @Override - public ClusterTasksResult execute(ClusterState currentState, List joiningNodes) throws Exception { - final ClusterTasksResult.Builder results = ClusterTasksResult.builder(); + public ClusterTasksResult execute(ClusterState currentState, List joiningNodes) throws Exception { + final ClusterTasksResult.Builder results = ClusterTasksResult.builder(); final DiscoveryNodes currentNodes = currentState.nodes(); boolean nodesChanged = false; ClusterState.Builder newState; - if (joiningNodes.size() == 1 && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) { + if (joiningNodes.size() == 1 && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) { return results.successes(joiningNodes).build(currentState); } else if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) { assert joiningNodes.contains(FINISH_ELECTION_TASK) : "becoming a master but election is not finished " + joiningNodes; @@ -81,12 +103,13 @@ public ClusterTasksResult execute(ClusterState currentState, List // we only enforce major version transitions on a fully formed clusters final boolean enforceMajorVersion = currentState.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false; // processing any joins - for (final DiscoveryNode node : joiningNodes) { - if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) { + for (final Task joinTask : joiningNodes) { + if (joinTask.equals(BECOME_MASTER_TASK) || joinTask.equals(FINISH_ELECTION_TASK)) { // noop - } else if (currentNodes.nodeExists(node)) { - logger.debug("received a join request for an existing node [{}]", node); + } else if (currentNodes.nodeExists(joinTask.node())) { + logger.debug("received a join request for an existing node [{}]", joinTask.node()); } else { + final DiscoveryNode node = joinTask.node(); try { if (enforceMajorVersion) { ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion); @@ -100,11 +123,11 @@ public ClusterTasksResult execute(ClusterState currentState, List minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion()); } catch (IllegalArgumentException | IllegalStateException e) { - results.failure(node, e); + results.failure(joinTask, e); continue; } } - results.success(node); + results.success(joinTask); } if (nodesChanged) { newState.nodes(nodesBuilder); @@ -116,23 +139,28 @@ public ClusterTasksResult execute(ClusterState currentState, List } } - private ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState currentState, List joiningNodes) { + private ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState currentState, List joiningNodes) { assert currentState.nodes().getMasterNodeId() == null : currentState; DiscoveryNodes currentNodes = currentState.nodes(); DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes); nodesBuilder.masterNodeId(currentState.nodes().getLocalNodeId()); - for (final DiscoveryNode joiningNode : joiningNodes) { - final DiscoveryNode nodeWithSameId = nodesBuilder.get(joiningNode.getId()); - if (nodeWithSameId != null && nodeWithSameId.equals(joiningNode) == false) { - logger.debug("removing existing node [{}], which conflicts with incoming join from [{}]", nodeWithSameId, joiningNode); - nodesBuilder.remove(nodeWithSameId.getId()); - } - final DiscoveryNode nodeWithSameAddress = currentNodes.findByAddress(joiningNode.getAddress()); - if (nodeWithSameAddress != null && nodeWithSameAddress.equals(joiningNode) == false) { - logger.debug("removing existing node [{}], which conflicts with incoming join from [{}]", nodeWithSameAddress, - joiningNode); - nodesBuilder.remove(nodeWithSameAddress.getId()); + for (final Task joinTask : joiningNodes) { + if (joinTask.equals(BECOME_MASTER_TASK) || joinTask.equals(FINISH_ELECTION_TASK)) { + // noop + } else { + final DiscoveryNode joiningNode = joinTask.node(); + final DiscoveryNode nodeWithSameId = nodesBuilder.get(joiningNode.getId()); + if (nodeWithSameId != null && nodeWithSameId.equals(joiningNode) == false) { + logger.debug("removing existing node [{}], which conflicts with incoming join from [{}]", nodeWithSameId, joiningNode); + nodesBuilder.remove(nodeWithSameId.getId()); + } + final DiscoveryNode nodeWithSameAddress = currentNodes.findByAddress(joiningNode.getAddress()); + if (nodeWithSameAddress != null && nodeWithSameAddress.equals(joiningNode) == false) { + logger.debug("removing existing node [{}], which conflicts with incoming join from [{}]", nodeWithSameAddress, + joiningNode); + nodesBuilder.remove(nodeWithSameAddress.getId()); + } } } @@ -155,26 +183,13 @@ public boolean runOnlyOnMaster() { /** * a task indicated that the current node should become master, if no current master is known */ - public static final DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_", - new TransportAddress(TransportAddress.META_ADDRESS, 0), - Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) { - @Override - public String toString() { - return ""; // this is not really task , so don't log anything about it... - } - }; + public static final Task BECOME_MASTER_TASK = new Task(null, "_BECOME_MASTER_TASK_"); /** * a task that is used to signal the election is stopped and we should process pending joins. * it may be use in combination with {@link JoinTaskExecutor#BECOME_MASTER_TASK} */ - public static final DiscoveryNode FINISH_ELECTION_TASK = new DiscoveryNode("_FINISH_ELECTION_", - new TransportAddress(TransportAddress.META_ADDRESS, 0), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) { - @Override - public String toString() { - return ""; // this is not really task , so don't log anything about it... - } - }; + public static final Task FINISH_ELECTION_TASK = new Task(null, "_FINISH_ELECTION_"); /** * Ensures that all indices are compatible with the given node version. This will ensure that all indices in the given metadata diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index f6c25b1cdfef3..86cf3bcefa117 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -174,7 +174,7 @@ public synchronized void handleJoinRequest(final DiscoveryNode node, final Membe checkPendingJoinsAndElectIfNeeded(); } else { masterService.submitStateUpdateTask("zen-disco-node-join", - node, ClusterStateTaskConfig.build(Priority.URGENT), + new JoinTaskExecutor.Task(node, "no election context"), ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor, new JoinTaskListener(callback, logger)); } } @@ -248,9 +248,10 @@ public synchronized boolean isEnoughPendingJoins(int pendingMasterJoins) { return hasEnough; } - private Map getPendingAsTasks() { - Map tasks = new HashMap<>(); - joinRequestAccumulator.entrySet().stream().forEach(e -> tasks.put(e.getKey(), new JoinTaskListener(e.getValue(), logger))); + private Map getPendingAsTasks(String reason) { + Map tasks = new HashMap<>(); + joinRequestAccumulator.entrySet().stream().forEach(e -> tasks.put( + new JoinTaskExecutor.Task(e.getKey(), reason), new JoinTaskListener(e.getValue(), logger))); return tasks; } @@ -271,7 +272,7 @@ assert isEnoughPendingJoins(getPendingMasterJoinsCount()) : "becoming a master b innerClose(); - Map tasks = getPendingAsTasks(); + Map tasks = getPendingAsTasks("become master"); final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)"; tasks.put(JoinTaskExecutor.BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result @@ -281,7 +282,7 @@ assert isEnoughPendingJoins(getPendingMasterJoinsCount()) : "becoming a master b public synchronized void closeAndProcessPending(String reason) { innerClose(); - Map tasks = getPendingAsTasks(); + Map tasks = getPendingAsTasks(reason); final String source = "zen-disco-election-stop [" + reason + "]"; tasks.put(JoinTaskExecutor.FINISH_ELECTION_TASK, electionFinishedListener); masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor); diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 52afd5687e728..a113a929648eb 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -229,14 +229,16 @@ public ClusterState reroute(ClusterState state, ClusterRerouteRequest request) { } public ClusterState addNodes(ClusterState clusterState, List nodes) { - return runTasks(joinTaskExecutor, clusterState, nodes); + return runTasks(joinTaskExecutor, clusterState, nodes.stream().map(node -> new JoinTaskExecutor.Task(node, "dummy reason")) + .collect(Collectors.toList())); } public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List nodes) { - List joinNodes = new ArrayList<>(); + List joinNodes = new ArrayList<>(); joinNodes.add(JoinTaskExecutor.BECOME_MASTER_TASK); joinNodes.add(JoinTaskExecutor.FINISH_ELECTION_TASK); - joinNodes.addAll(nodes); + joinNodes.addAll(nodes.stream().map(node -> new JoinTaskExecutor.Task(node, "dummy reason")) + .collect(Collectors.toList())); return runTasks(joinTaskExecutor, clusterState, joinNodes); } From b07eae8f2b08f305585c9fdf4ba5a9ae46e9929d Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 16 Aug 2018 17:59:45 +0200 Subject: [PATCH 3/3] checkstyle --- .../org/elasticsearch/discovery/zen/NodeJoinController.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index 86cf3bcefa117..913008b5c6d20 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -275,7 +275,8 @@ assert isEnoughPendingJoins(getPendingMasterJoinsCount()) : "becoming a master b Map tasks = getPendingAsTasks("become master"); final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)"; - tasks.put(JoinTaskExecutor.BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result + // noop listener, the election finished listener determines result + tasks.put(JoinTaskExecutor.BECOME_MASTER_TASK, (source1, e) -> {}); tasks.put(JoinTaskExecutor.FINISH_ELECTION_TASK, electionFinishedListener); masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor); }