Skip to content

Commit

Permalink
Register GetDiscoveredNodesAction with TransportService
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Nov 6, 2018
1 parent 0a5a8a8 commit 3ee0174
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
Expand All @@ -40,9 +42,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

public class TransportGetDiscoveredNodesAction extends TransportAction<GetDiscoveredNodesRequest, GetDiscoveredNodesResponse> {
public class TransportGetDiscoveredNodesAction extends HandledTransportAction<GetDiscoveredNodesRequest, GetDiscoveredNodesResponse> {

@Nullable // TODO make this not nullable
private final Coordinator coordinator;
Expand All @@ -51,7 +54,9 @@ public class TransportGetDiscoveredNodesAction extends TransportAction<GetDiscov
@Inject
public TransportGetDiscoveredNodesAction(Settings settings, ActionFilters actionFilters, TransportService transportService,
Discovery discovery) {
super(settings, GetDiscoveredNodesAction.NAME, actionFilters, transportService.getTaskManager());
super(settings, GetDiscoveredNodesAction.NAME, transportService, actionFilters,
(Reader<GetDiscoveredNodesRequest>) GetDiscoveredNodesRequest::new);

this.transportService = transportService;
if (discovery instanceof Coordinator) {
coordinator = (Coordinator) discovery;
Expand All @@ -71,6 +76,7 @@ protected void doExecute(Task task, GetDiscoveredNodesRequest request, ActionLis
if (localNode.isMasterNode() == false) {
throw new ElasticsearchException("this node is not master-eligible");
}
final AtomicBoolean listenerNotified = new AtomicBoolean();
final ListenableFuture<GetDiscoveredNodesResponse> listenableFuture = new ListenableFuture<>();
final ThreadPool threadPool = transportService.getThreadPool();
listenableFuture.addListener(listener,
Expand All @@ -84,7 +90,7 @@ public void accept(Iterable<DiscoveryNode> nodes) {
nodesSet.add(localNode);
nodes.forEach(nodesSet::add);
logger.trace("discovered {}", nodesSet);
if (nodesSet.size() >= request.getMinimumNodeCount()) {
if (nodesSet.size() >= request.getMinimumNodeCount() && listenerNotified.compareAndSet(false, true)) {
listenableFuture.onResponse(new GetDiscoveredNodesResponse(nodesSet));
}
}
Expand All @@ -101,7 +107,9 @@ public String toString() {
threadPool.schedule(request.getTimeout(), Names.GENERIC, new Runnable() {
@Override
public void run() {
listenableFuture.onFailure(new ElasticsearchTimeoutException("timed out while waiting for " + request));
if (listenerNotified.compareAndSet(false, true)) {
listenableFuture.onFailure(new ElasticsearchTimeoutException("timed out while waiting for " + request));
}
}

@Override
Expand Down
Loading

0 comments on commit 3ee0174

Please sign in to comment.