Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish cluster state to masters first #37673

Merged
merged 1 commit into from
Jan 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void setCurrentNodes(DiscoveryNodes discoveryNodes) {
followerCheckers.keySet().removeIf(isUnknownNode);
faultyNodes.removeIf(isUnknownNode);

for (final DiscoveryNode discoveryNode : discoveryNodes) {
discoveryNodes.mastersFirstStream().forEach(discoveryNode -> {
if (discoveryNode.equals(discoveryNodes.getLocalNode()) == false
&& followerCheckers.containsKey(discoveryNode) == false
&& faultyNodes.contains(discoveryNode) == false) {
Expand All @@ -144,7 +144,7 @@ public void setCurrentNodes(DiscoveryNodes discoveryNodes) {
followerCheckers.put(discoveryNode, followerChecker);
followerChecker.start();
}
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Publication(PublishRequest publishRequest, AckListener ackListener, LongS
startTime = currentTimeSupplier.getAsLong();
applyCommitRequest = Optional.empty();
publicationTargets = new ArrayList<>(publishRequest.getAcceptedState().getNodes().getNodes().size());
publishRequest.getAcceptedState().getNodes().iterator().forEachRemaining(n -> publicationTargets.add(new PublicationTarget(n)));
publishRequest.getAcceptedState().getNodes().mastersFirstStream().forEach(n -> publicationTargets.add(new PublicationTarget(n)));
}

public void start(Set<DiscoveryNode> faultyNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
Expand Down Expand Up @@ -161,6 +162,14 @@ public ImmutableOpenMap<String, DiscoveryNode> getCoordinatingOnlyNodes() {
return nodes.build();
}

/**
* Returns a stream of all nodes, with master nodes at the front
*/
public Stream<DiscoveryNode> mastersFirstStream() {
return Stream.concat(StreamSupport.stream(masterNodes.spliterator(), false).map(cur -> cur.value),
StreamSupport.stream(this.spliterator(), false).filter(n -> n.isMasterNode() == false));
}

/**
* Get a node by its id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.test.EqualsHashCodeTestUtils.CopyFunction;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportException;
Expand All @@ -41,12 +43,21 @@
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME;
Expand All @@ -62,6 +73,7 @@
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.Mockito.mock;

public class FollowersCheckerTests extends ESTestCase {

Expand Down Expand Up @@ -536,6 +548,46 @@ public String executor() {
}
}

private void testPreferMasterNodes() {
List<DiscoveryNode> nodes = randomNodes(10);
DiscoveryNodes.Builder discoNodesBuilder = DiscoveryNodes.builder();
nodes.forEach(dn -> discoNodesBuilder.add(dn));
DiscoveryNodes discoveryNodes = discoNodesBuilder.localNodeId(nodes.get(0).getId()).build();
CapturingTransport capturingTransport = new CapturingTransport();
TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY, mock(ThreadPool.class),
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> nodes.get(0), null, emptySet());
final FollowersChecker followersChecker = new FollowersChecker(Settings.EMPTY, transportService, fcr -> {
assert false : fcr;
}, (node, reason) -> {
assert false : node;
});
followersChecker.setCurrentNodes(discoveryNodes);
List<DiscoveryNode> followerTargets = Stream.of(capturingTransport.getCapturedRequestsAndClear())
.map(cr -> cr.node).collect(Collectors.toList());
List<DiscoveryNode> sortedFollowerTargets = new ArrayList<>(followerTargets);
Collections.sort(sortedFollowerTargets, Comparator.comparing(n -> n.isMasterNode() == false));
assertEquals(sortedFollowerTargets, followerTargets);
}

private static List<DiscoveryNode> randomNodes(final int numNodes) {
List<DiscoveryNode> nodesList = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
Map<String, String> attributes = new HashMap<>();
if (frequently()) {
attributes.put("custom", randomBoolean() ? "match" : randomAlphaOfLengthBetween(3, 5));
}
final DiscoveryNode node = newNode(i, attributes,
new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))));
nodesList.add(node);
}
return nodesList;
}

private static DiscoveryNode newNode(int nodeId, Map<String, String> attributes, Set<DiscoveryNode.Role> roles) {
return new DiscoveryNode("name_" + nodeId, "node_" + nodeId, buildNewFakeTransportAddress(), attributes, roles,
Version.CURRENT);
}

private static class ExpectsSuccess implements TransportResponseHandler<Empty> {
private final AtomicBoolean responseReceived = new AtomicBoolean();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.coordination;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
Expand All @@ -33,10 +34,13 @@
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -97,8 +101,8 @@ abstract class MockPublication extends Publication {

boolean committed;

Map<DiscoveryNode, ActionListener<PublishWithJoinResponse>> pendingPublications = new HashMap<>();
Map<DiscoveryNode, ActionListener<TransportResponse.Empty>> pendingCommits = new HashMap<>();
Map<DiscoveryNode, ActionListener<PublishWithJoinResponse>> pendingPublications = new LinkedHashMap<>();
Map<DiscoveryNode, ActionListener<TransportResponse.Empty>> pendingCommits = new LinkedHashMap<>();
Map<DiscoveryNode, Join> joins = new HashMap<>();
Set<DiscoveryNode> missingJoins = new HashSet<>();

Expand Down Expand Up @@ -372,6 +376,22 @@ public void testClusterStatePublishingFailsOrTimesOutBeforeCommit() throws Inter
tuple.v1().equals(n2) ? "dummy failure" : "non-failed nodes do not form a quorum")));
}

public void testPublishingToMastersFirst() {
VotingConfiguration singleNodeConfig = new VotingConfiguration(Sets.newHashSet(n1.getId()));
initializeCluster(singleNodeConfig);

DiscoveryNodes.Builder discoNodesBuilder = DiscoveryNodes.builder();
randomNodes(10).forEach(dn -> discoNodesBuilder.add(dn));
DiscoveryNodes discoveryNodes = discoNodesBuilder.add(n1).localNodeId(n1.getId()).build();
MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L,
discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), null, Collections.emptySet());

List<DiscoveryNode> publicationTargets = new ArrayList<>(publication.pendingPublications.keySet());
List<DiscoveryNode> sortedPublicationTargets = new ArrayList<>(publicationTargets);
Collections.sort(sortedPublicationTargets, Comparator.comparing(n -> n.isMasterNode() == false));
assertEquals(sortedPublicationTargets, publicationTargets);
}

public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedException {
VotingConfiguration config = new VotingConfiguration(randomBoolean() ?
Sets.newHashSet(n1.getId(), n2.getId()) : Sets.newHashSet(n1.getId(), n2.getId(), n3.getId()));
Expand Down Expand Up @@ -428,6 +448,25 @@ public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedEx
assertEquals(discoNodes, ackListener.await(0L, TimeUnit.SECONDS));
}

private static List<DiscoveryNode> randomNodes(final int numNodes) {
List<DiscoveryNode> nodesList = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
Map<String, String> attributes = new HashMap<>();
if (frequently()) {
attributes.put("custom", randomBoolean() ? "match" : randomAlphaOfLengthBetween(3, 5));
}
final DiscoveryNode node = newNode(i, attributes,
new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))));
nodesList.add(node);
}
return nodesList;
}

private static DiscoveryNode newNode(int nodeId, Map<String, String> attributes, Set<DiscoveryNode.Role> roles) {
return new DiscoveryNode("name_" + nodeId, "node_" + nodeId, buildNewFakeTransportAddress(), attributes, roles,
Version.CURRENT);
}

public static <T> Collector<T, ?, Stream<T>> shuffle() {
return Collectors.collectingAndThen(Collectors.toList(),
ts -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -149,6 +150,18 @@ public void testResolveNodesIds() {
assertThat(resolvedNodesIds, equalTo(expectedNodesIds));
}

public void testMastersFirst() {
final List<DiscoveryNode> inputNodes = randomNodes(10);
final DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
inputNodes.forEach(discoBuilder::add);
final List<DiscoveryNode> returnedNodes = discoBuilder.build().mastersFirstStream().collect(Collectors.toList());
assertEquals(returnedNodes.size(), inputNodes.size());
assertEquals(new HashSet<>(returnedNodes), new HashSet<>(inputNodes));
final List<DiscoveryNode> sortedNodes = new ArrayList<>(returnedNodes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just sort the inputNodes and assert that this is what is returned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, because the returned nodes will not be in insertion order (as an ImmutableOpenMap is used by DiscoveryNodes) after sorting them by masterness.

Collections.sort(sortedNodes, Comparator.comparing(n -> n.isMasterNode() == false));
assertEquals(sortedNodes, returnedNodes);
}

public void testDeltas() {
Set<DiscoveryNode> nodesA = new HashSet<>();
nodesA.addAll(randomNodes(1 + randomInt(10)));
Expand Down