Skip to content

Commit

Permalink
Move PeerFinder machinery to discovery package
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Aug 6, 2018
1 parent 3608505 commit 2013f10
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.coordination.PeerFinder;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.OperationRouting;
Expand All @@ -55,6 +54,7 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.PeerFinder;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.FaultDetection;
import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,3 @@ public interface ConfiguredHostsResolver {
*/
void resolveConfiguredHosts(Consumer<List<TransportAddress>> consumer);
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
* under the License.
*/

package org.elasticsearch.cluster.coordination;
package org.elasticsearch.discovery;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.coordination.FutureExecutor;
import org.elasticsearch.cluster.coordination.PeersResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
Expand All @@ -33,7 +35,6 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.ConfiguredHostsResolver;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.cluster.coordination;
package org.elasticsearch.discovery;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,6 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;

public class MessagesTests extends ESTestCase {

private DiscoveryNode createNode(String id) {
Expand Down Expand Up @@ -155,69 +145,5 @@ public ClusterState randomClusterState() {
new ClusterState.VotingConfiguration(Sets.newHashSet(generateRandomStringArray(10, 10, false))),
randomLong());
}

private List<DiscoveryNode> modifyDiscoveryNodesList(Collection<DiscoveryNode> originalNodes, boolean allowEmpty) {
final List<DiscoveryNode> discoveryNodes = new ArrayList<>(originalNodes);
if (discoveryNodes.isEmpty() == false && randomBoolean() && (allowEmpty || discoveryNodes.size() > 1)) {
discoveryNodes.remove(randomIntBetween(0, discoveryNodes.size() - 1));
} else if (discoveryNodes.isEmpty() == false && randomBoolean()) {
discoveryNodes.set(randomIntBetween(0, discoveryNodes.size() - 1), createNode(randomAlphaOfLength(10)));
} else {
discoveryNodes.add(createNode(randomAlphaOfLength(10)));
}
return discoveryNodes;
}

public void testPeersRequestEqualsHashCodeSerialization() {
final PeersRequest initialPeersRequest = new PeersRequest(createNode(randomAlphaOfLength(10)),
Arrays.stream(generateRandomStringArray(10, 10, false)).map(this::createNode).collect(Collectors.toList()));

EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPeersRequest,
publishRequest -> copyWriteable(publishRequest, writableRegistry(), PeersRequest::new),
in -> {
final List<DiscoveryNode> discoveryNodes = new ArrayList<>(in.getKnownPeers());
if (randomBoolean()) {
return new PeersRequest(createNode(randomAlphaOfLength(10)), discoveryNodes);
} else {
return new PeersRequest(in.getSourceNode(), modifyDiscoveryNodesList(in.getKnownPeers(), true));
}
});
}

public void testPeersResponseEqualsHashCodeSerialization() {
final long initialTerm = randomNonNegativeLong();
final PeersResponse initialPeersResponse;

if (randomBoolean()) {
initialPeersResponse = new PeersResponse(Optional.of(createNode(randomAlphaOfLength(10))), emptyList(), initialTerm);
} else {
initialPeersResponse = new PeersResponse(Optional.empty(),
Arrays.stream(generateRandomStringArray(10, 10, false, false)).map(this::createNode).collect(Collectors.toList()),
initialTerm);
}

EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPeersResponse,
publishResponse -> copyWriteable(publishResponse, writableRegistry(), PeersResponse::new),
in -> {
final long term = in.getTerm();
if (randomBoolean()) {
return new PeersResponse(in.getMasterNode(), in.getKnownPeers(),
randomValueOtherThan(term, ESTestCase::randomNonNegativeLong));
} else {
if (in.getMasterNode().isPresent()) {
if (randomBoolean()) {
return new PeersResponse(Optional.of(createNode(randomAlphaOfLength(10))), in.getKnownPeers(), term);
} else {
return new PeersResponse(Optional.empty(), singletonList(createNode(randomAlphaOfLength(10))), term);
}
} else {
if (randomBoolean()) {
return new PeersResponse(Optional.of(createNode(randomAlphaOfLength(10))), emptyList(), term);
} else {
return new PeersResponse(in.getMasterNode(), modifyDiscoveryNodesList(in.getKnownPeers(), false), term);
}
}
}
});
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.coordination.PeersResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;

public class PeerFinderMessagesTests extends ESTestCase {
private DiscoveryNode createNode(String id) {
return new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT);
}

public void testPeersRequestEqualsHashCodeSerialization() {
final PeersRequest initialPeersRequest = new PeersRequest(createNode(randomAlphaOfLength(10)),
Arrays.stream(generateRandomStringArray(10, 10, false)).map(this::createNode).collect(Collectors.toList()));

EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPeersRequest,
publishRequest -> copyWriteable(publishRequest, writableRegistry(), PeersRequest::new),
in -> {
final List<DiscoveryNode> discoveryNodes = new ArrayList<>(in.getKnownPeers());
if (randomBoolean()) {
return new PeersRequest(createNode(randomAlphaOfLength(10)), discoveryNodes);
} else {
return new PeersRequest(in.getSourceNode(), modifyDiscoveryNodesList(in.getKnownPeers(), true));
}
});
}

public void testPeersResponseEqualsHashCodeSerialization() {
final long initialTerm = randomNonNegativeLong();
final PeersResponse initialPeersResponse;

if (randomBoolean()) {
initialPeersResponse = new PeersResponse(Optional.of(createNode(randomAlphaOfLength(10))), emptyList(), initialTerm);
} else {
initialPeersResponse = new PeersResponse(Optional.empty(),
Arrays.stream(generateRandomStringArray(10, 10, false, false)).map(this::createNode).collect(Collectors.toList()),
initialTerm);
}

EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPeersResponse,
publishResponse -> copyWriteable(publishResponse, writableRegistry(), PeersResponse::new),
in -> {
final long term = in.getTerm();
if (randomBoolean()) {
return new PeersResponse(in.getMasterNode(), in.getKnownPeers(),
randomValueOtherThan(term, ESTestCase::randomNonNegativeLong));
} else {
if (in.getMasterNode().isPresent()) {
if (randomBoolean()) {
return new PeersResponse(Optional.of(createNode(randomAlphaOfLength(10))), in.getKnownPeers(), term);
} else {
return new PeersResponse(Optional.empty(), singletonList(createNode(randomAlphaOfLength(10))), term);
}
} else {
if (randomBoolean()) {
return new PeersResponse(Optional.of(createNode(randomAlphaOfLength(10))), emptyList(), term);
} else {
return new PeersResponse(in.getMasterNode(), modifyDiscoveryNodesList(in.getKnownPeers(), false), term);
}
}
}
});
}


private List<DiscoveryNode> modifyDiscoveryNodesList(Collection<DiscoveryNode> originalNodes, boolean allowEmpty) {
final List<DiscoveryNode> discoveryNodes = new ArrayList<>(originalNodes);
if (discoveryNodes.isEmpty() == false && randomBoolean() && (allowEmpty || discoveryNodes.size() > 1)) {
discoveryNodes.remove(randomIntBetween(0, discoveryNodes.size() - 1));
} else if (discoveryNodes.isEmpty() == false && randomBoolean()) {
discoveryNodes.set(randomIntBetween(0, discoveryNodes.size() - 1), createNode(randomAlphaOfLength(10)));
} else {
discoveryNodes.add(createNode(randomAlphaOfLength(10)));
}
return discoveryNodes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@
* under the License.
*/

package org.elasticsearch.cluster.coordination;
package org.elasticsearch.discovery;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.coordination.PeerFinder.TransportAddressConnector;
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.cluster.coordination.FutureExecutor;
import org.elasticsearch.cluster.coordination.PeersResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.node.DiscoveryNodes.Builder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.PeerFinder.TransportAddressConnector;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest;
Expand Down Expand Up @@ -58,7 +61,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static org.elasticsearch.cluster.coordination.PeerFinder.REQUEST_PEERS_ACTION_NAME;
import static org.elasticsearch.discovery.PeerFinder.REQUEST_PEERS_ACTION_NAME;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -436,7 +439,7 @@ public void testReceivesRequestsFromTransportService() {

final AtomicBoolean responseReceived = new AtomicBoolean();

transportService.sendRequest(localNode, PeerFinder.REQUEST_PEERS_ACTION_NAME, new PeersRequest(sourceNode, Collections.emptyList()),
transportService.sendRequest(localNode, REQUEST_PEERS_ACTION_NAME, new PeersRequest(sourceNode, Collections.emptyList()),
new TransportResponseHandler<PeersResponse>() {
@Override
public void handleResponse(PeersResponse response) {
Expand Down

0 comments on commit 2013f10

Please sign in to comment.