From 0a5a8a8a2e0542463acfa64d7077fa4d74dbb354 Mon Sep 17 00:00:00 2001 From: David Turner <david.turner@elastic.co> Date: Tue, 6 Nov 2018 12:07:25 +0000 Subject: [PATCH] Register TransportBootstrapClusterAction with TransportService --- .../TransportBootstrapClusterAction.java | 5 +- .../TransportBootstrapClusterActionTests.java | 138 ++++++++++-------- 2 files changed, 83 insertions(+), 60 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterAction.java index 31e395f4af0b7..a25f3ff2e1a90 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.coordination.Coordinator; @@ -32,7 +33,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; -public class TransportBootstrapClusterAction extends TransportAction<BootstrapClusterRequest, AcknowledgedResponse> { +public class TransportBootstrapClusterAction extends HandledTransportAction<BootstrapClusterRequest, AcknowledgedResponse> { @Nullable // TODO make this not nullable private final Coordinator coordinator; @@ -41,7 +42,7 @@ public class TransportBootstrapClusterAction extends TransportAction<BootstrapCl @Inject public TransportBootstrapClusterAction(Settings settings, ActionFilters actionFilters, TransportService transportService, Discovery discovery) { - super(settings, BootstrapClusterAction.NAME, actionFilters, transportService.getTaskManager()); + super(settings, BootstrapClusterAction.NAME, transportService, actionFilters, BootstrapClusterRequest::new); this.transportService = transportService; if (discovery instanceof Coordinator) { coordinator = (Coordinator) discovery; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java index 7a12488ea9600..91cbbd4611e00 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java @@ -18,9 +18,7 @@ */ package org.elasticsearch.action.admin.cluster.bootstrap; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -33,17 +31,23 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode.Role; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; -import java.util.concurrent.atomic.AtomicBoolean; +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -55,11 +59,14 @@ import static org.mockito.Mockito.verifyZeroInteractions; public class TransportBootstrapClusterActionTests extends ESTestCase { + + private final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet()); + private static BootstrapClusterRequest exampleRequest() { return new BootstrapClusterRequest(new BootstrapConfiguration(singletonList(new NodeDescription("id", "name")))); } - public void testHandlesNonstandardDiscoveryImplementation() { + public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException { final MockTransport transport = new MockTransport(); final ThreadPool threadPool = new TestThreadPool("test", Settings.EMPTY); final DiscoveryNode discoveryNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); @@ -69,29 +76,29 @@ public void testHandlesNonstandardDiscoveryImplementation() { final Discovery discovery = mock(Discovery.class); verifyZeroInteractions(discovery); - final TransportBootstrapClusterAction transportBootstrapClusterAction - = new TransportBootstrapClusterAction(Settings.EMPTY, mock(ActionFilters.class), transportService, discovery); + new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, discovery); // registers action + transportService.start(); + transportService.acceptIncomingRequests(); - final ActionListener<AcknowledgedResponse> listener = new ActionListener<AcknowledgedResponse>() { + final CountDownLatch countDownLatch = new CountDownLatch(1); + transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, exampleRequest(), new ResponseHandler() { @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { + public void handleResponse(AcknowledgedResponse response) { throw new AssertionError("should not be called"); } @Override - public void onFailure(Exception e) { - throw new AssertionError("should not be called"); + public void handleException(TransportException exp) { + assertThat(exp.getRootCause().getMessage(), equalTo("cluster bootstrapping is not supported by this discovery type")); + countDownLatch.countDown(); } - }; - - assertThat(expectThrows(IllegalStateException.class, - () -> transportBootstrapClusterAction.doExecute(mock(Task.class), exampleRequest(), listener)) - .getMessage(), equalTo("cluster bootstrapping is not supported by this discovery type")); + }); + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); threadPool.shutdown(); } - public void testFailsOnNonMasterEligibleNodes() { + public void testFailsOnNonMasterEligibleNodes() throws InterruptedException { final DiscoveryNode discoveryNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); @@ -99,8 +106,6 @@ public void testFailsOnNonMasterEligibleNodes() { final ThreadPool threadPool = new TestThreadPool("test", Settings.EMPTY); final TransportService transportService = transport.createTransportService(Settings.EMPTY, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> discoveryNode, null, emptySet()); - transportService.start(); - transportService.acceptIncomingRequests(); final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final ClusterState state = ClusterState.builder(new ClusterName("cluster")).build(); @@ -108,32 +113,32 @@ public void testFailsOnNonMasterEligibleNodes() { ESAllocationTestCase.createAllocationService(Settings.EMPTY), new MasterService("local", Settings.EMPTY, threadPool), () -> new InMemoryPersistedState(0, state), r -> emptyList(), - new NoOpClusterApplier(), random()); - coordinator.start(); + new NoOpClusterApplier(), new Random(random().nextLong())); - final TransportBootstrapClusterAction transportBootstrapClusterAction - = new TransportBootstrapClusterAction(Settings.EMPTY, mock(ActionFilters.class), transportService, coordinator); + new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action + transportService.start(); + transportService.acceptIncomingRequests(); + coordinator.start(); - final ActionListener<AcknowledgedResponse> listener = new ActionListener<AcknowledgedResponse>() { + final CountDownLatch countDownLatch = new CountDownLatch(1); + transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, exampleRequest(), new ResponseHandler() { @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { + public void handleResponse(AcknowledgedResponse response) { throw new AssertionError("should not be called"); } @Override - public void onFailure(Exception e) { - throw new AssertionError("should not be called"); + public void handleException(TransportException exp) { + assertThat(exp.getRootCause().getMessage(), equalTo("this node is not master-eligible")); + countDownLatch.countDown(); } - }; - - assertThat(expectThrows(ElasticsearchException.class, - () -> transportBootstrapClusterAction.doExecute(mock(Task.class), exampleRequest(), listener)).getMessage(), - equalTo("this node is not master-eligible")); + }); + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); threadPool.shutdown(); } - public void testSetsInitialConfiguration() { + public void testSetsInitialConfiguration() throws InterruptedException { final DiscoveryNode discoveryNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT); @@ -141,8 +146,6 @@ public void testSetsInitialConfiguration() { final ThreadPool threadPool = new TestThreadPool("test", Settings.EMPTY); final TransportService transportService = transport.createTransportService(Settings.EMPTY, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> discoveryNode, null, emptySet()); - transportService.start(); - transportService.acceptIncomingRequests(); final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final ClusterState state = ClusterState.builder(new ClusterName("cluster")).build(); @@ -150,52 +153,71 @@ public void testSetsInitialConfiguration() { ESAllocationTestCase.createAllocationService(Settings.EMPTY), new MasterService("local", Settings.EMPTY, threadPool), () -> new InMemoryPersistedState(0, state), r -> emptyList(), - new NoOpClusterApplier(), random()); + new NoOpClusterApplier(), new Random(random().nextLong())); + + new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action + transportService.start(); + transportService.acceptIncomingRequests(); coordinator.start(); coordinator.startInitialJoin(); - final TransportBootstrapClusterAction transportBootstrapClusterAction - = new TransportBootstrapClusterAction(Settings.EMPTY, mock(ActionFilters.class), transportService, coordinator); - - final AtomicBoolean responseReceived = new AtomicBoolean(); - assertFalse(coordinator.isInitialConfigurationSet()); final BootstrapClusterRequest request = new BootstrapClusterRequest(new BootstrapConfiguration(singletonList(new NodeDescription(discoveryNode)))); - transportBootstrapClusterAction.doExecute(mock(Task.class), request, - new ActionListener<AcknowledgedResponse>() { + { + final CountDownLatch countDownLatch = new CountDownLatch(1); + transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, request, new ResponseHandler() { @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - assertTrue(acknowledgedResponse.isAcknowledged()); - responseReceived.set(true); + public void handleResponse(AcknowledgedResponse response) { + assertTrue(response.isAcknowledged()); + countDownLatch.countDown(); } @Override - public void onFailure(Exception e) { - throw new AssertionError("should not be called"); + public void handleException(TransportException exp) { + throw new AssertionError("should not be called", exp); } }); - assertTrue(responseReceived.get()); + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } + assertTrue(coordinator.isInitialConfigurationSet()); - responseReceived.set(false); - transportBootstrapClusterAction.doExecute(mock(Task.class), request, - new ActionListener<AcknowledgedResponse>() { + { + final CountDownLatch countDownLatch = new CountDownLatch(1); + transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, request, new ResponseHandler() { @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - assertFalse(acknowledgedResponse.isAcknowledged()); - responseReceived.set(true); + public void handleResponse(AcknowledgedResponse response) { + assertFalse(response.isAcknowledged()); + countDownLatch.countDown(); } @Override - public void onFailure(Exception e) { - throw new AssertionError("should not be called"); + public void handleException(TransportException exp) { + throw new AssertionError("should not be called", exp); } }); - assertTrue(responseReceived.get()); + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } threadPool.shutdown(); } + + private abstract class ResponseHandler implements TransportResponseHandler<AcknowledgedResponse> { + @Override + public String executor() { + return Names.SAME; + } + + @Override + public AcknowledgedResponse read(StreamInput in) throws IOException { + AcknowledgedResponse acknowledgedResponse = new AcknowledgedResponse(); + acknowledgedResponse.readFrom(in); + return acknowledgedResponse; + } + } }