Skip to content

Commit

Permalink
Register TransportBootstrapClusterAction with TransportService
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Nov 6, 2018
1 parent 1b62782 commit 0a5a8a8
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.coordination.Coordinator;
Expand All @@ -32,7 +33,7 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

public class TransportBootstrapClusterAction extends TransportAction<BootstrapClusterRequest, AcknowledgedResponse> {
public class TransportBootstrapClusterAction extends HandledTransportAction<BootstrapClusterRequest, AcknowledgedResponse> {

@Nullable // TODO make this not nullable
private final Coordinator coordinator;
Expand All @@ -41,7 +42,7 @@ public class TransportBootstrapClusterAction extends TransportAction<BootstrapCl
@Inject
public TransportBootstrapClusterAction(Settings settings, ActionFilters actionFilters, TransportService transportService,
Discovery discovery) {
super(settings, BootstrapClusterAction.NAME, actionFilters, transportService.getTaskManager());
super(settings, BootstrapClusterAction.NAME, transportService, actionFilters, BootstrapClusterRequest::new);
this.transportService = transportService;
if (discovery instanceof Coordinator) {
coordinator = (Coordinator) discovery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
*/
package org.elasticsearch.action.admin.cluster.bootstrap;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -33,17 +31,23 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.util.concurrent.atomic.AtomicBoolean;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
Expand All @@ -55,11 +59,14 @@
import static org.mockito.Mockito.verifyZeroInteractions;

public class TransportBootstrapClusterActionTests extends ESTestCase {

private final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet());

private static BootstrapClusterRequest exampleRequest() {
return new BootstrapClusterRequest(new BootstrapConfiguration(singletonList(new NodeDescription("id", "name"))));
}

public void testHandlesNonstandardDiscoveryImplementation() {
public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException {
final MockTransport transport = new MockTransport();
final ThreadPool threadPool = new TestThreadPool("test", Settings.EMPTY);
final DiscoveryNode discoveryNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
Expand All @@ -69,133 +76,148 @@ public void testHandlesNonstandardDiscoveryImplementation() {
final Discovery discovery = mock(Discovery.class);
verifyZeroInteractions(discovery);

final TransportBootstrapClusterAction transportBootstrapClusterAction
= new TransportBootstrapClusterAction(Settings.EMPTY, mock(ActionFilters.class), transportService, discovery);
new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, discovery); // registers action
transportService.start();
transportService.acceptIncomingRequests();

final ActionListener<AcknowledgedResponse> listener = new ActionListener<AcknowledgedResponse>() {
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, exampleRequest(), new ResponseHandler() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
public void handleResponse(AcknowledgedResponse response) {
throw new AssertionError("should not be called");
}

@Override
public void onFailure(Exception e) {
throw new AssertionError("should not be called");
public void handleException(TransportException exp) {
assertThat(exp.getRootCause().getMessage(), equalTo("cluster bootstrapping is not supported by this discovery type"));
countDownLatch.countDown();
}
};

assertThat(expectThrows(IllegalStateException.class,
() -> transportBootstrapClusterAction.doExecute(mock(Task.class), exampleRequest(), listener))
.getMessage(), equalTo("cluster bootstrapping is not supported by this discovery type"));
});

assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
threadPool.shutdown();
}

public void testFailsOnNonMasterEligibleNodes() {
public void testFailsOnNonMasterEligibleNodes() throws InterruptedException {
final DiscoveryNode discoveryNode
= new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);

final MockTransport transport = new MockTransport();
final ThreadPool threadPool = new TestThreadPool("test", Settings.EMPTY);
final TransportService transportService = transport.createTransportService(Settings.EMPTY, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> discoveryNode, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();

final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final ClusterState state = ClusterState.builder(new ClusterName("cluster")).build();
final Coordinator coordinator = new Coordinator("local", Settings.EMPTY, clusterSettings, transportService,
ESAllocationTestCase.createAllocationService(Settings.EMPTY),
new MasterService("local", Settings.EMPTY, threadPool),
() -> new InMemoryPersistedState(0, state), r -> emptyList(),
new NoOpClusterApplier(), random());
coordinator.start();
new NoOpClusterApplier(), new Random(random().nextLong()));

final TransportBootstrapClusterAction transportBootstrapClusterAction
= new TransportBootstrapClusterAction(Settings.EMPTY, mock(ActionFilters.class), transportService, coordinator);
new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
transportService.start();
transportService.acceptIncomingRequests();
coordinator.start();

final ActionListener<AcknowledgedResponse> listener = new ActionListener<AcknowledgedResponse>() {
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, exampleRequest(), new ResponseHandler() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
public void handleResponse(AcknowledgedResponse response) {
throw new AssertionError("should not be called");
}

@Override
public void onFailure(Exception e) {
throw new AssertionError("should not be called");
public void handleException(TransportException exp) {
assertThat(exp.getRootCause().getMessage(), equalTo("this node is not master-eligible"));
countDownLatch.countDown();
}
};

assertThat(expectThrows(ElasticsearchException.class,
() -> transportBootstrapClusterAction.doExecute(mock(Task.class), exampleRequest(), listener)).getMessage(),
equalTo("this node is not master-eligible"));
});

assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
threadPool.shutdown();
}

public void testSetsInitialConfiguration() {
public void testSetsInitialConfiguration() throws InterruptedException {
final DiscoveryNode discoveryNode
= new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT);

final MockTransport transport = new MockTransport();
final ThreadPool threadPool = new TestThreadPool("test", Settings.EMPTY);
final TransportService transportService = transport.createTransportService(Settings.EMPTY, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> discoveryNode, null, emptySet());
transportService.start();
transportService.acceptIncomingRequests();

final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final ClusterState state = ClusterState.builder(new ClusterName("cluster")).build();
final Coordinator coordinator = new Coordinator("local", Settings.EMPTY, clusterSettings, transportService,
ESAllocationTestCase.createAllocationService(Settings.EMPTY),
new MasterService("local", Settings.EMPTY, threadPool),
() -> new InMemoryPersistedState(0, state), r -> emptyList(),
new NoOpClusterApplier(), random());
new NoOpClusterApplier(), new Random(random().nextLong()));

new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
transportService.start();
transportService.acceptIncomingRequests();
coordinator.start();
coordinator.startInitialJoin();

final TransportBootstrapClusterAction transportBootstrapClusterAction
= new TransportBootstrapClusterAction(Settings.EMPTY, mock(ActionFilters.class), transportService, coordinator);

final AtomicBoolean responseReceived = new AtomicBoolean();

assertFalse(coordinator.isInitialConfigurationSet());

final BootstrapClusterRequest request
= new BootstrapClusterRequest(new BootstrapConfiguration(singletonList(new NodeDescription(discoveryNode))));

transportBootstrapClusterAction.doExecute(mock(Task.class), request,
new ActionListener<AcknowledgedResponse>() {
{
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, request, new ResponseHandler() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
assertTrue(acknowledgedResponse.isAcknowledged());
responseReceived.set(true);
public void handleResponse(AcknowledgedResponse response) {
assertTrue(response.isAcknowledged());
countDownLatch.countDown();
}

@Override
public void onFailure(Exception e) {
throw new AssertionError("should not be called");
public void handleException(TransportException exp) {
throw new AssertionError("should not be called", exp);
}
});
assertTrue(responseReceived.get());

assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}

assertTrue(coordinator.isInitialConfigurationSet());

responseReceived.set(false);
transportBootstrapClusterAction.doExecute(mock(Task.class), request,
new ActionListener<AcknowledgedResponse>() {
{
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, request, new ResponseHandler() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
assertFalse(acknowledgedResponse.isAcknowledged());
responseReceived.set(true);
public void handleResponse(AcknowledgedResponse response) {
assertFalse(response.isAcknowledged());
countDownLatch.countDown();
}

@Override
public void onFailure(Exception e) {
throw new AssertionError("should not be called");
public void handleException(TransportException exp) {
throw new AssertionError("should not be called", exp);
}
});
assertTrue(responseReceived.get());

assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}

threadPool.shutdown();
}

private abstract class ResponseHandler implements TransportResponseHandler<AcknowledgedResponse> {
@Override
public String executor() {
return Names.SAME;
}

@Override
public AcknowledgedResponse read(StreamInput in) throws IOException {
AcknowledgedResponse acknowledgedResponse = new AcknowledgedResponse();
acknowledgedResponse.readFrom(in);
return acknowledgedResponse;
}
}
}

0 comments on commit 0a5a8a8

Please sign in to comment.