diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksRequest.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksRequest.java index 978e647b9991a..1d1249e15510c 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksRequest.java @@ -36,14 +36,6 @@ public class CancelTasksRequest extends BaseTasksRequest { private String reason = DEFAULT_REASON; - /** - * Cancel tasks on the specified nodes. If none are passed, all cancellable tasks on - * all nodes will be cancelled. - */ - public CancelTasksRequest(String... nodesIds) { - super(nodesIds); - } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -54,7 +46,6 @@ public void readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(reason); - } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java index 73e382c4cd685..b07e540d79260 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; @@ -36,6 +35,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.TransportChannel; @@ -84,9 +84,9 @@ protected TaskInfo readTaskResponse(StreamInput in) throws IOException { } protected void processTasks(CancelTasksRequest request, Consumer operation) { - if (request.taskId() != BaseTasksRequest.ALL_TASKS) { + if (request.taskId().isSet() == false) { // we are only checking one task, we can optimize it - CancellableTask task = taskManager.getCancellableTask(request.taskId()); + CancellableTask task = taskManager.getCancellableTask(request.taskId().getId()); if (task != null) { if (request.match(task)) { operation.accept(task); @@ -94,7 +94,7 @@ protected void processTasks(CancelTasksRequest request, Consumer nodes, BanLock banLock) { - sendSetBanRequest(nodes, new BanParentTaskRequest(clusterService.localNode().getId(), task.getId(), reason), banLock); + sendSetBanRequest(nodes, + BanParentTaskRequest.createSetBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()), reason), + banLock); } private void removeBanOnNodes(CancellableTask task, Set nodes) { - sendRemoveBanRequest(nodes, new BanParentTaskRequest(clusterService.localNode().getId(), task.getId())); + sendRemoveBanRequest(nodes, + BanParentTaskRequest.createRemoveBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()))); } private void sendSetBanRequest(Set nodes, BanParentTaskRequest request, BanLock banLock) { @@ -148,8 +151,8 @@ private void sendSetBanRequest(Set nodes, BanParentTaskRequest request, DiscoveryNode discoveryNode = clusterState.getNodes().get(node); if (discoveryNode != null) { // Check if node still in the cluster - logger.debug("Sending ban for tasks with the parent [{}:{}] to the node [{}], ban [{}]", request.parentNodeId, request - .parentTaskId, node, request.ban); + logger.debug("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]", request.parentTaskId, node, + request.ban); transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override @@ -164,8 +167,8 @@ public void handleException(TransportException exp) { }); } else { banLock.onBanSet(); - logger.debug("Cannot send ban for tasks with the parent [{}:{}] to the node [{}] - the node no longer in the cluster", - request.parentNodeId, request.parentTaskId, node); + logger.debug("Cannot send ban for tasks with the parent [{}] to the node [{}] - the node no longer in the cluster", + request.parentTaskId, node); } } } @@ -176,13 +179,12 @@ private void sendRemoveBanRequest(Set nodes, BanParentTaskRequest reques DiscoveryNode discoveryNode = clusterState.getNodes().get(node); if (discoveryNode != null) { // Check if node still in the cluster - logger.debug("Sending remove ban for tasks with the parent [{}:{}] to the node [{}]", request.parentNodeId, - request.parentTaskId, node); + logger.debug("Sending remove ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node); transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request, EmptyTransportResponseHandler .INSTANCE_SAME); } else { - logger.debug("Cannot send remove ban request for tasks with the parent [{}:{}] to the node [{}] - the node no longer in " + - "the cluster", request.parentNodeId, request.parentTaskId, node); + logger.debug("Cannot send remove ban request for tasks with the parent [{}] to the node [{}] - the node no longer in " + + "the cluster", request.parentTaskId, node); } } } @@ -218,23 +220,27 @@ public void finish() { private static class BanParentTaskRequest extends TransportRequest { - private String parentNodeId; - - private long parentTaskId; + private TaskId parentTaskId; private boolean ban; private String reason; - BanParentTaskRequest(String parentNodeId, long parentTaskId, String reason) { - this.parentNodeId = parentNodeId; + static BanParentTaskRequest createSetBanParentTaskRequest(TaskId parentTaskId, String reason) { + return new BanParentTaskRequest(parentTaskId, reason); + } + + static BanParentTaskRequest createRemoveBanParentTaskRequest(TaskId parentTaskId) { + return new BanParentTaskRequest(parentTaskId); + } + + private BanParentTaskRequest(TaskId parentTaskId, String reason) { this.parentTaskId = parentTaskId; this.ban = true; this.reason = reason; } - BanParentTaskRequest(String parentNodeId, long parentTaskId) { - this.parentNodeId = parentNodeId; + private BanParentTaskRequest(TaskId parentTaskId) { this.parentTaskId = parentTaskId; this.ban = false; } @@ -245,8 +251,7 @@ public BanParentTaskRequest() { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - parentNodeId = in.readString(); - parentTaskId = in.readLong(); + parentTaskId = new TaskId(in); ban = in.readBoolean(); if (ban) { reason = in.readString(); @@ -256,8 +261,7 @@ public void readFrom(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(parentNodeId); - out.writeLong(parentTaskId); + parentTaskId.writeTo(out); out.writeBoolean(ban); if (ban) { out.writeString(reason); @@ -269,13 +273,13 @@ class BanParentRequestHandler implements TransportRequestHandler { private boolean detailed = false; - /** - * Get information from nodes based on the nodes ids specified. If none are passed, information - * for all nodes will be returned. - */ - public ListTasksRequest(String... nodesIds) { - super(nodesIds); - } - /** * Should the detailed task information be returned. */ @@ -48,7 +40,7 @@ public boolean detailed() { } /** - * Should the node settings be returned. + * Should the detailed task information be returned. */ public ListTasksRequest detailed(boolean detailed) { this.detailed = detailed; diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java index 1a14a52715032..3ad4299e38faa 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java @@ -138,11 +138,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.endObject(); } - builder.startArray("tasks"); + builder.startObject("tasks"); for(TaskInfo task : entry.getValue()) { + builder.startObject(task.getTaskId().toString(), XContentBuilder.FieldCaseConversion.NONE); task.toXContent(builder, params); + builder.endObject(); } - builder.endArray(); + builder.endObject(); builder.endObject(); } builder.endObject(); diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TaskInfo.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TaskInfo.java index 33ed991407773..d71c576093eb0 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TaskInfo.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TaskInfo.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; @@ -41,7 +42,7 @@ public class TaskInfo implements Writeable, ToXContent { private final DiscoveryNode node; - private final long id; + private final TaskId taskId; private final String type; @@ -51,28 +52,21 @@ public class TaskInfo implements Writeable, ToXContent { private final Task.Status status; - private final String parentNode; + private final TaskId parentTaskId; - private final long parentId; - - public TaskInfo(DiscoveryNode node, long id, String type, String action, String description, Task.Status status) { - this(node, id, type, action, description, status, null, -1L); - } - - public TaskInfo(DiscoveryNode node, long id, String type, String action, String description, Task.Status status, String parentNode, long parentId) { + public TaskInfo(DiscoveryNode node, long id, String type, String action, String description, Task.Status status, TaskId parentTaskId) { this.node = node; - this.id = id; + this.taskId = new TaskId(node.getId(), id); this.type = type; this.action = action; this.description = description; this.status = status; - this.parentNode = parentNode; - this.parentId = parentId; + this.parentTaskId = parentTaskId; } public TaskInfo(StreamInput in) throws IOException { node = DiscoveryNode.readNode(in); - id = in.readLong(); + taskId = new TaskId(node.getId(), in.readLong()); type = in.readString(); action = in.readString(); description = in.readOptionalString(); @@ -81,8 +75,11 @@ public TaskInfo(StreamInput in) throws IOException { } else { status = null; } - parentNode = in.readOptionalString(); - parentId = in.readLong(); + parentTaskId = new TaskId(in); + } + + public TaskId getTaskId() { + return taskId; } public DiscoveryNode getNode() { @@ -90,7 +87,7 @@ public DiscoveryNode getNode() { } public long getId() { - return id; + return taskId.getId(); } public String getType() { @@ -113,12 +110,8 @@ public Task.Status getStatus() { return status; } - public String getParentNode() { - return parentNode; - } - - public long getParentId() { - return parentId; + public TaskId getParentTaskId() { + return parentTaskId; } @Override @@ -129,7 +122,7 @@ public TaskInfo readFrom(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { node.writeTo(out); - out.writeLong(id); + out.writeLong(taskId.getId()); out.writeString(type); out.writeString(action); out.writeOptionalString(description); @@ -139,15 +132,13 @@ public void writeTo(StreamOutput out) throws IOException { } else { out.writeBoolean(false); } - out.writeOptionalString(parentNode); - out.writeLong(parentId); + parentTaskId.writeTo(out); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); builder.field("node", node.getId()); - builder.field("id", id); + builder.field("id", taskId.getId()); builder.field("type", type); builder.field("action", action); if (status != null) { @@ -156,11 +147,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (description != null) { builder.field("description", description); } - if (parentNode != null) { - builder.field("parent_node", parentNode); - builder.field("parent_id", parentId); + if (parentTaskId.isSet() == false) { + builder.field("parent_task_id", parentTaskId.toString()); } - builder.endObject(); return builder; } } diff --git a/core/src/main/java/org/elasticsearch/action/support/ChildTaskActionRequest.java b/core/src/main/java/org/elasticsearch/action/support/ChildTaskActionRequest.java index 4e3800f7232fa..a1b749fcc1769 100644 --- a/core/src/main/java/org/elasticsearch/action/support/ChildTaskActionRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/ChildTaskActionRequest.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; @@ -31,40 +32,35 @@ */ public abstract class ChildTaskActionRequest> extends ActionRequest { - private String parentTaskNode; - - private long parentTaskId; + private TaskId parentTaskId = TaskId.EMPTY_TASK_ID; protected ChildTaskActionRequest() { } public void setParentTask(String parentTaskNode, long parentTaskId) { - this.parentTaskNode = parentTaskNode; - this.parentTaskId = parentTaskId; + this.parentTaskId = new TaskId(parentTaskNode, parentTaskId); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - parentTaskNode = in.readOptionalString(); - parentTaskId = in.readLong(); + parentTaskId = new TaskId(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeOptionalString(parentTaskNode); - out.writeLong(parentTaskId); + parentTaskId.writeTo(out); } @Override public final Task createTask(long id, String type, String action) { - return createTask(id, type, action, parentTaskNode, parentTaskId); + return createTask(id, type, action, parentTaskId); } - public Task createTask(long id, String type, String action, String parentTaskNode, long parentTaskId) { - return new Task(id, type, action, getDescription(), parentTaskNode, parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return new Task(id, type, action, getDescription(), parentTaskId); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/ChildTaskRequest.java b/core/src/main/java/org/elasticsearch/action/support/ChildTaskRequest.java index 361ba6013f993..2a84327a82029 100644 --- a/core/src/main/java/org/elasticsearch/action/support/ChildTaskRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/ChildTaskRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; @@ -31,38 +32,33 @@ */ public class ChildTaskRequest extends TransportRequest { - private String parentTaskNode; - - private long parentTaskId; + private TaskId parentTaskId = TaskId.EMPTY_TASK_ID; protected ChildTaskRequest() { } public void setParentTask(String parentTaskNode, long parentTaskId) { - this.parentTaskNode = parentTaskNode; - this.parentTaskId = parentTaskId; + this.parentTaskId = new TaskId(parentTaskNode, parentTaskId); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - parentTaskNode = in.readOptionalString(); - parentTaskId = in.readLong(); + parentTaskId = new TaskId(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeOptionalString(parentTaskNode); - out.writeLong(parentTaskId); + parentTaskId.writeTo(out); } @Override public final Task createTask(long id, String type, String action) { - return createTask(id, type, action, parentTaskNode, parentTaskId); + return createTask(id, type, action, parentTaskId); } - public Task createTask(long id, String type, String action, String parentTaskNode, long parentTaskId) { - return new Task(id, type, action, getDescription(), parentTaskNode, parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return new Task(id, type, action, getDescription(), parentTaskId); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 337ded6fb56f1..6283e69a02eb9 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; import java.util.concurrent.TimeUnit; @@ -186,8 +187,8 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public Task createTask(long id, String type, String action, String parentTaskNode, long parentTaskId) { - return new ReplicationTask(id, type, action, getDescription(), parentTaskNode, parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return new ReplicationTask(id, type, action, getDescription(), parentTaskId); } /** diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java index 2cd6a67eec5dd..da3fce74fa8ab 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationTask.java @@ -19,11 +19,11 @@ package org.elasticsearch.action.support.replication; -import org.elasticsearch.common.inject.Provider; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; @@ -35,8 +35,8 @@ public class ReplicationTask extends Task { private volatile String phase = "starting"; - public ReplicationTask(long id, String type, String action, String description, String parentNode, long parentId) { - super(id, type, action, description, parentNode, parentId); + public ReplicationTask(long id, String type, String action, String description, TaskId parentTaskId) { + super(id, type, action, description, parentTaskId); } /** diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java b/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java index 9650eaee194c1..f7da48a667bdc 100644 --- a/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java @@ -27,9 +27,12 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import static org.elasticsearch.action.ValidateActions.addValidationError; + /** * A base class for task requests */ @@ -47,26 +50,21 @@ public class BaseTasksRequest> extends private String[] actions = ALL_ACTIONS; - private String parentNode; - - private long parentTaskId = ALL_TASKS; + private TaskId parentTaskId = TaskId.EMPTY_TASK_ID; - private long taskId = ALL_TASKS; + private TaskId taskId = TaskId.EMPTY_TASK_ID; public BaseTasksRequest() { } @Override public ActionRequestValidationException validate() { - return null; - } - - /** - * Get information about tasks from nodes based on the nodes ids specified. - * If none are passed, information for all nodes will be returned. - */ - public BaseTasksRequest(String... nodesIds) { - this.nodesIds = nodesIds; + ActionRequestValidationException validationException = null; + if (taskId.isSet() == false && nodesIds.length > 0) { + validationException = addValidationError("task id cannot be used together with node ids", + validationException); + } + return validationException; } /** @@ -100,39 +98,26 @@ public final Request nodesIds(String... nodesIds) { * * By default tasks with any ids are returned. */ - public long taskId() { + public TaskId taskId() { return taskId; } @SuppressWarnings("unchecked") - public final Request taskId(long taskId) { + public final Request taskId(TaskId taskId) { this.taskId = taskId; return (Request) this; } - /** - * Returns the parent node id that tasks should be filtered by - */ - public String parentNode() { - return parentNode; - } - - @SuppressWarnings("unchecked") - public Request parentNode(String parentNode) { - this.parentNode = parentNode; - return (Request) this; - } - /** * Returns the parent task id that tasks should be filtered by */ - public long parentTaskId() { + public TaskId parentTaskId() { return parentTaskId; } @SuppressWarnings("unchecked") - public Request parentTaskId(long parentTaskId) { + public Request parentTaskId(TaskId parentTaskId) { this.parentTaskId = parentTaskId; return (Request) this; } @@ -157,11 +142,10 @@ public final Request timeout(String timeout) { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); + taskId = new TaskId(in); + parentTaskId = new TaskId(in); nodesIds = in.readStringArray(); - taskId = in.readLong(); actions = in.readStringArray(); - parentNode = in.readOptionalString(); - parentTaskId = in.readLong(); if (in.readBoolean()) { timeout = TimeValue.readTimeValue(in); } @@ -170,11 +154,10 @@ public void readFrom(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + taskId.writeTo(out); + parentTaskId.writeTo(out); out.writeStringArrayNullable(nodesIds); - out.writeLong(taskId); out.writeStringArrayNullable(actions); - out.writeOptionalString(parentNode); - out.writeLong(parentTaskId); out.writeOptionalStreamable(timeout); } @@ -182,18 +165,13 @@ public boolean match(Task task) { if (actions() != null && actions().length > 0 && Regex.simpleMatch(actions(), task.getAction()) == false) { return false; } - if (taskId() != ALL_TASKS) { - if(taskId() != task.getId()) { - return false; - } - } - if (parentNode() != null) { - if (parentNode().equals(task.getParentNode()) == false) { + if (taskId().isSet() == false) { + if(taskId().getId() != task.getId()) { return false; } } - if (parentTaskId() != ALL_TASKS) { - if (parentTaskId() != task.getParentId()) { + if (parentTaskId.isSet() == false) { + if (parentTaskId.equals(task.getParentTaskId()) == false) { return false; } } diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java index 77915f7d0c9aa..54d48143577b6 100644 --- a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -124,13 +124,17 @@ protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) { } protected String[] resolveNodes(TasksRequest request, ClusterState clusterState) { - return clusterState.nodes().resolveNodesIds(request.nodesIds()); + if (request.taskId().isSet()) { + return clusterState.nodes().resolveNodesIds(request.nodesIds()); + } else { + return new String[]{request.taskId().getNodeId()}; + } } protected void processTasks(TasksRequest request, Consumer operation) { - if (request.taskId() != BaseTasksRequest.ALL_TASKS) { + if (request.taskId().isSet() == false) { // we are only checking one task, we can optimize it - Task task = taskManager.getTask(request.taskId()); + Task task = taskManager.getTask(request.taskId().getId()); if (task != null) { if (request.match(task)) { operation.accept((OperationTask) task); @@ -143,13 +147,14 @@ protected void processTasks(TasksRequest request, Consumer operat } else { for (Task task : taskManager.getTasks().values()) { if (request.match(task)) { - operation.accept((OperationTask)task); + operation.accept((OperationTask) task); } } } } - protected abstract TasksResponse newResponse(TasksRequest request, List tasks, List taskOperationFailures, List failedNodeExceptions); + protected abstract TasksResponse newResponse(TasksRequest request, List tasks, List + taskOperationFailures, List failedNodeExceptions); @SuppressWarnings("unchecked") protected TasksResponse newResponse(TasksRequest request, AtomicReferenceArray responses) { @@ -232,34 +237,36 @@ private void start() { onFailure(idx, nodeId, new NoSuchNodeException(nodeId)); } else if (!clusterService.localNode().shouldConnectTo(node) && !clusterService.localNode().equals(node)) { // the check "!clusterService.localNode().equals(node)" is to maintain backward comp. where before - // we allowed to connect from "local" client node to itself, certain tests rely on it, if we remove it, we need to fix + // we allowed to connect from "local" client node to itself, certain tests rely on it, if we remove it, we + // need to fix // those (and they randomize the client node usage, so tricky to find when) onFailure(idx, nodeId, new NodeShouldNotConnectException(clusterService.localNode(), node)); } else { NodeTaskRequest nodeRequest = new NodeTaskRequest(request); nodeRequest.setParentTask(clusterService.localNode().id(), task.getId()); taskManager.registerChildTask(task, node.getId()); - transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new BaseTransportResponseHandler() { - @Override - public NodeTasksResponse newInstance() { - return new NodeTasksResponse(); - } - - @Override - public void handleResponse(NodeTasksResponse response) { - onOperation(idx, response); - } - - @Override - public void handleException(TransportException exp) { - onFailure(idx, node.id(), exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); + transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), + new BaseTransportResponseHandler() { + @Override + public NodeTasksResponse newInstance() { + return new NodeTasksResponse(); + } + + @Override + public void handleResponse(NodeTasksResponse response) { + onOperation(idx, response); + } + + @Override + public void handleException(TransportException exp) { + onFailure(idx, node.id(), exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); } } catch (Throwable t) { onFailure(idx, nodeId, t); diff --git a/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java b/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java index ffd36790cfa9b..d7c76906f91fe 100644 --- a/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java +++ b/core/src/main/java/org/elasticsearch/client/ClusterAdminClient.java @@ -272,7 +272,7 @@ public interface ClusterAdminClient extends ElasticsearchClient { * * @param request The nodes tasks request * @return The result future - * @see org.elasticsearch.client.Requests#listTasksRequest(String...) + * @see org.elasticsearch.client.Requests#listTasksRequest() */ ActionFuture listTasks(ListTasksRequest request); @@ -281,7 +281,7 @@ public interface ClusterAdminClient extends ElasticsearchClient { * * @param request The nodes tasks request * @param listener A listener to be notified with a result - * @see org.elasticsearch.client.Requests#listTasksRequest(String...) + * @see org.elasticsearch.client.Requests#listTasksRequest() */ void listTasks(ListTasksRequest request, ActionListener listener); @@ -295,7 +295,7 @@ public interface ClusterAdminClient extends ElasticsearchClient { * * @param request The nodes tasks request * @return The result future - * @see org.elasticsearch.client.Requests#cancelTasksRequest(String...) + * @see org.elasticsearch.client.Requests#cancelTasksRequest() */ ActionFuture cancelTasks(CancelTasksRequest request); @@ -304,7 +304,7 @@ public interface ClusterAdminClient extends ElasticsearchClient { * * @param request The nodes tasks request * @param listener A cancelener to be notified with a result - * @see org.elasticsearch.client.Requests#cancelTasksRequest(String...) + * @see org.elasticsearch.client.Requests#cancelTasksRequest() */ void cancelTasks(CancelTasksRequest request, ActionListener listener); diff --git a/core/src/main/java/org/elasticsearch/client/Requests.java b/core/src/main/java/org/elasticsearch/client/Requests.java index c3dd77a3e44a0..3cf4f3dc6cbfc 100644 --- a/core/src/main/java/org/elasticsearch/client/Requests.java +++ b/core/src/main/java/org/elasticsearch/client/Requests.java @@ -419,23 +419,11 @@ public static ListTasksRequest listTasksRequest() { /** * Creates a nodes tasks request against one or more nodes. Pass null or an empty array for all nodes. * - * @param nodesIds The nodes ids to get the tasks for - * @return The nodes tasks request - * @see org.elasticsearch.client.ClusterAdminClient#listTasks(ListTasksRequest) - */ - public static ListTasksRequest listTasksRequest(String... nodesIds) { - return new ListTasksRequest(nodesIds); - } - - /** - * Creates a nodes tasks request against one or more nodes. Pass null or an empty array for all nodes. - * - * @param nodesIds The nodes ids to cancel the tasks on * @return The nodes tasks request * @see org.elasticsearch.client.ClusterAdminClient#cancelTasks(CancelTasksRequest) */ - public static CancelTasksRequest cancelTasksRequest(String... nodesIds) { - return new CancelTasksRequest(nodesIds); + public static CancelTasksRequest cancelTasksRequest() { + return new CancelTasksRequest(); } /** diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestCancelTasksAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestCancelTasksAction.java index 81736d9eb7aef..99cdc16253a7b 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestCancelTasksAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestCancelTasksAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.support.RestToXContentListener; +import org.elasticsearch.tasks.TaskId; import static org.elasticsearch.rest.RestRequest.Method.POST; @@ -40,22 +41,20 @@ public class RestCancelTasksAction extends BaseRestHandler { public RestCancelTasksAction(Settings settings, RestController controller, Client client) { super(settings, client); controller.registerHandler(POST, "/_tasks/_cancel", this); - controller.registerHandler(POST, "/_tasks/{nodeId}/_cancel", this); - controller.registerHandler(POST, "/_tasks/{nodeId}/{taskId}/_cancel", this); + controller.registerHandler(POST, "/_tasks/{taskId}/_cancel", this); } @Override public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); - long taskId = request.paramAsLong("taskId", ListTasksRequest.ALL_TASKS); + TaskId taskId = new TaskId(request.param("taskId")); String[] actions = Strings.splitStringByCommaToArray(request.param("actions")); - String parentNode = request.param("parent_node"); - long parentTaskId = request.paramAsLong("parent_task", ListTasksRequest.ALL_TASKS); + TaskId parentTaskId = new TaskId(request.param("parent_task_id")); - CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(nodesIds); + CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); cancelTasksRequest.taskId(taskId); + cancelTasksRequest.nodesIds(nodesIds); cancelTasksRequest.actions(actions); - cancelTasksRequest.parentNode(parentNode); cancelTasksRequest.parentTaskId(parentTaskId); client.admin().cluster().cancelTasks(cancelTasksRequest, new RestToXContentListener<>(channel)); } diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestListTasksAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestListTasksAction.java index 4c9213945fa41..992267fa8a51c 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestListTasksAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestListTasksAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.support.RestToXContentListener; +import org.elasticsearch.tasks.TaskId; import static org.elasticsearch.rest.RestRequest.Method.GET; @@ -39,24 +40,22 @@ public class RestListTasksAction extends BaseRestHandler { public RestListTasksAction(Settings settings, RestController controller, Client client) { super(settings, client); controller.registerHandler(GET, "/_tasks", this); - controller.registerHandler(GET, "/_tasks/{nodeId}", this); - controller.registerHandler(GET, "/_tasks/{nodeId}/{taskId}", this); + controller.registerHandler(GET, "/_tasks/{taskId}", this); } @Override public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { boolean detailed = request.paramAsBoolean("detailed", false); - String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); - long taskId = request.paramAsLong("taskId", ListTasksRequest.ALL_TASKS); + String[] nodesIds = Strings.splitStringByCommaToArray(request.param("node_id")); + TaskId taskId = new TaskId(request.param("taskId")); String[] actions = Strings.splitStringByCommaToArray(request.param("actions")); - String parentNode = request.param("parent_node"); - long parentTaskId = request.paramAsLong("parent_task", ListTasksRequest.ALL_TASKS); + TaskId parentTaskId = new TaskId(request.param("parent_task_id")); - ListTasksRequest listTasksRequest = new ListTasksRequest(nodesIds); + ListTasksRequest listTasksRequest = new ListTasksRequest(); listTasksRequest.taskId(taskId); + listTasksRequest.nodesIds(nodesIds); listTasksRequest.detailed(detailed); listTasksRequest.actions(actions); - listTasksRequest.parentNode(parentNode); listTasksRequest.parentTaskId(parentTaskId); client.admin().cluster().listTasks(listTasksRequest, new RestToXContentListener<>(channel)); } diff --git a/core/src/main/java/org/elasticsearch/tasks/CancellableTask.java b/core/src/main/java/org/elasticsearch/tasks/CancellableTask.java index 3297977cb3a96..8916a8be7cbd6 100644 --- a/core/src/main/java/org/elasticsearch/tasks/CancellableTask.java +++ b/core/src/main/java/org/elasticsearch/tasks/CancellableTask.java @@ -32,8 +32,8 @@ public CancellableTask(long id, String type, String action, String description) super(id, type, action, description); } - public CancellableTask(long id, String type, String action, String description, String parentNode, long parentId) { - super(id, type, action, description, parentNode, parentId); + public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId) { + super(id, type, action, description, parentTaskId); } /** diff --git a/core/src/main/java/org/elasticsearch/tasks/Task.java b/core/src/main/java/org/elasticsearch/tasks/Task.java index 621166c9ccc22..5aa034b799773 100644 --- a/core/src/main/java/org/elasticsearch/tasks/Task.java +++ b/core/src/main/java/org/elasticsearch/tasks/Task.java @@ -30,8 +30,6 @@ */ public class Task { - public static final long NO_PARENT_ID = 0; - private final long id; private final String type; @@ -40,22 +38,18 @@ public class Task { private final String description; - private final String parentNode; - - private final long parentId; - + private final TaskId parentTask; public Task(long id, String type, String action, String description) { - this(id, type, action, description, null, NO_PARENT_ID); + this(id, type, action, description, TaskId.EMPTY_TASK_ID); } - public Task(long id, String type, String action, String description, String parentNode, long parentId) { + public Task(long id, String type, String action, String description, TaskId parentTask) { this.id = id; this.type = type; this.action = action; this.description = description; - this.parentNode = parentNode; - this.parentId = parentId; + this.parentTask = parentTask; } /** @@ -75,7 +69,7 @@ public TaskInfo taskInfo(DiscoveryNode node, boolean detailed) { description = getDescription(); status = getStatus(); } - return new TaskInfo(node, getId(), getType(), getAction(), description, status, parentNode, parentId); + return new TaskInfo(node, getId(), getType(), getAction(), description, status, parentTask); } /** @@ -106,18 +100,11 @@ public String getDescription() { return description; } - /** - * Returns the parent node of the task or null if the task doesn't have any parent tasks - */ - public String getParentNode() { - return parentNode; - } - /** * Returns id of the parent task or NO_PARENT_ID if the task doesn't have any parent tasks */ - public long getParentId() { - return parentId; + public TaskId getParentTaskId() { + return parentTask; } /** diff --git a/core/src/main/java/org/elasticsearch/tasks/TaskId.java b/core/src/main/java/org/elasticsearch/tasks/TaskId.java new file mode 100644 index 0000000000000..5c5ad36cc17bf --- /dev/null +++ b/core/src/main/java/org/elasticsearch/tasks/TaskId.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.tasks; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * Task id that consists of node id and id of the task on the node + */ +public final class TaskId implements Writeable { + + public final static TaskId EMPTY_TASK_ID = new TaskId("", -1L); + + private final String nodeId; + private final long id; + + public TaskId(String nodeId, long id) { + this.nodeId = nodeId; + this.id = id; + } + + public TaskId(String taskId) { + if (Strings.hasLength(taskId) && "unset".equals(taskId) == false) { + String[] s = Strings.split(taskId, ":"); + if (s == null || s.length != 2) { + throw new IllegalArgumentException("malformed task id " + taskId); + } + this.nodeId = s[0]; + try { + this.id = Long.parseLong(s[1]); + } catch (NumberFormatException ex) { + throw new IllegalArgumentException("malformed task id " + taskId, ex); + } + } else { + nodeId = ""; + id = -1L; + } + } + + public TaskId(StreamInput in) throws IOException { + nodeId = in.readString(); + id = in.readLong(); + } + + public String getNodeId() { + return nodeId; + } + + public long getId() { + return id; + } + + public boolean isSet() { + return id == -1L; + } + + @Override + public String toString() { + if (isSet()) { + return "unset"; + } else { + return nodeId + ":" + id; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(nodeId); + out.writeLong(id); + } + + @Override + public TaskId readFrom(StreamInput in) throws IOException { + return new TaskId(in); + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TaskId taskId = (TaskId) o; + + if (id != taskId.id) return false; + return nodeId.equals(taskId.nodeId); + + } + + @Override + public int hashCode() { + int result = nodeId.hashCode(); + result = 31 * result + (int) (id ^ (id >>> 32)); + return result; + } +} diff --git a/core/src/main/java/org/elasticsearch/tasks/TaskManager.java b/core/src/main/java/org/elasticsearch/tasks/TaskManager.java index f30330ec28f7c..0c785573c9942 100644 --- a/core/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/core/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -22,7 +22,6 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -51,7 +50,7 @@ public class TaskManager extends AbstractComponent implements ClusterStateListen private final AtomicLong taskIdGenerator = new AtomicLong(); - private final Map, String> banedParents = new ConcurrentHashMap<>(); + private final Map banedParents = new ConcurrentHashMap<>(); public TaskManager(Settings settings) { super(settings); @@ -77,8 +76,8 @@ public Task register(String type, String action, TransportRequest request) { CancellableTaskHolder oldHolder = cancellableTasks.put(task.getId(), holder); assert oldHolder == null; // Check if this task was banned before we start it - if (task.getParentNode() != null && banedParents.isEmpty() == false) { - String reason = banedParents.get(new Tuple<>(task.getParentNode(), task.getParentId())); + if (task.getParentTaskId().isSet() == false && banedParents.isEmpty() == false) { + String reason = banedParents.get(task.getParentTaskId()); if (reason != null) { try { holder.cancel(reason); @@ -191,22 +190,21 @@ public int getBanCount() { *

* This method is called when a parent task that has children is cancelled. */ - public void setBan(String parentNode, long parentId, String reason) { - logger.trace("setting ban for the parent task {}:{} {}", parentNode, parentId, reason); + public void setBan(TaskId parentTaskId, String reason) { + logger.trace("setting ban for the parent task {} {}", parentTaskId, reason); // Set the ban first, so the newly created tasks cannot be registered - Tuple ban = new Tuple<>(parentNode, parentId); synchronized (banedParents) { - if (lastDiscoveryNodes.nodeExists(parentNode)) { + if (lastDiscoveryNodes.nodeExists(parentTaskId.getNodeId())) { // Only set the ban if the node is the part of the cluster - banedParents.put(ban, reason); + banedParents.put(parentTaskId, reason); } } // Now go through already running tasks and cancel them for (Map.Entry taskEntry : cancellableTasks.entrySet()) { CancellableTaskHolder holder = taskEntry.getValue(); - if (holder.hasParent(parentNode, parentId)) { + if (holder.hasParent(parentTaskId)) { holder.cancel(reason); } } @@ -217,9 +215,9 @@ public void setBan(String parentNode, long parentId, String reason) { *

* This method is called when a previously banned task finally cancelled */ - public void removeBan(String parentNode, long parentId) { - logger.trace("removing ban for the parent task {}:{} {}", parentNode, parentId); - banedParents.remove(new Tuple<>(parentNode, parentId)); + public void removeBan(TaskId parentTaskId) { + logger.trace("removing ban for the parent task {}", parentTaskId); + banedParents.remove(parentTaskId); } @Override @@ -228,14 +226,12 @@ public void clusterChanged(ClusterChangedEvent event) { synchronized (banedParents) { lastDiscoveryNodes = event.state().getNodes(); // Remove all bans that were registered by nodes that are no longer in the cluster state - Iterator> banIterator = banedParents.keySet().iterator(); + Iterator banIterator = banedParents.keySet().iterator(); while (banIterator.hasNext()) { - Tuple nodeAndTaskId = banIterator.next(); - String nodeId = nodeAndTaskId.v1(); - Long taskId = nodeAndTaskId.v2(); - if (lastDiscoveryNodes.nodeExists(nodeId) == false) { - logger.debug("Removing ban for the parent [{}:{}] on the node [{}], reason: the parent node is gone", nodeId, - taskId, event.state().getNodes().localNode()); + TaskId taskId = banIterator.next(); + if (lastDiscoveryNodes.nodeExists(taskId.getNodeId()) == false) { + logger.debug("Removing ban for the parent [{}] on the node [{}], reason: the parent node is gone", taskId, + event.state().getNodes().localNode()); banIterator.remove(); } } @@ -244,10 +240,10 @@ public void clusterChanged(ClusterChangedEvent event) { for (Map.Entry taskEntry : cancellableTasks.entrySet()) { CancellableTaskHolder holder = taskEntry.getValue(); CancellableTask task = holder.getTask(); - String parent = task.getParentNode(); - if (parent != null && lastDiscoveryNodes.nodeExists(parent) == false) { + TaskId parentTaskId = task.getParentTaskId(); + if (parentTaskId.isSet() == false && lastDiscoveryNodes.nodeExists(parentTaskId.getNodeId()) == false) { if (task.cancelOnParentLeaving()) { - holder.cancel("Coordinating node [" + parent + "] left the cluster"); + holder.cancel("Coordinating node [" + parentTaskId.getNodeId() + "] left the cluster"); } } } @@ -340,8 +336,8 @@ public void finish() { } - public boolean hasParent(String parentNode, long parentId) { - return parentId == task.getParentId() && parentNode.equals(task.getParentNode()); + public boolean hasParent(TaskId parentTaskId) { + return task.getParentTaskId().equals(parentTaskId); } public CancellableTask getTask() { diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java index 7a4912e3a7d4f..5109ab979cf09 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -87,8 +88,8 @@ public String getDescription() { } @Override - public Task createTask(long id, String type, String action, String parentTaskNode, long parentTaskId) { - return new CancellableTask(id, type, action, getDescription(), parentTaskNode, parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId); } } @@ -235,9 +236,9 @@ public void onFailure(Throwable e) { }); // Cancel main task - CancelTasksRequest request = new CancelTasksRequest(testNodes[0].discoveryNode.getId()); + CancelTasksRequest request = new CancelTasksRequest(); request.reason("Testing Cancellation"); - request.taskId(mainTask.getId()); + request.taskId(new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId())); // And send the cancellation request to a random node CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request) .get(); @@ -269,7 +270,8 @@ public void onFailure(Throwable e) { // Make sure that tasks are no longer running ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)] - .transportListTasksAction.execute(new ListTasksRequest(testNodes[0].discoveryNode.getId()).taskId(mainTask.getId())).get(); + .transportListTasksAction.execute(new ListTasksRequest().taskId( + new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId()))).get(); assertEquals(0, listTasksResponse.getTasks().size()); // Make sure that there are no leftover bans, the ban removal is async, so we might return from the cancellation @@ -311,7 +313,7 @@ public void onFailure(Throwable e) { // Make sure that tasks are running ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)] - .transportListTasksAction.execute(new ListTasksRequest().parentNode(mainNode).taskId(mainTask.getId())).get(); + .transportListTasksAction.execute(new ListTasksRequest().parentTaskId(new TaskId(mainNode, mainTask.getId()))).get(); assertThat(listTasksResponse.getTasks().size(), greaterThanOrEqualTo(blockOnNodes.size())); // Simulate the coordinating node leaving the cluster @@ -328,9 +330,9 @@ public void onFailure(Throwable e) { if (simulateBanBeforeLeaving) { logger.info("--> Simulate issuing cancel request on the node that is about to leave the cluster"); // Simulate issuing cancel request on the node that is about to leave the cluster - CancelTasksRequest request = new CancelTasksRequest(testNodes[0].discoveryNode.getId()); + CancelTasksRequest request = new CancelTasksRequest(); request.reason("Testing Cancellation"); - request.taskId(mainTask.getId()); + request.taskId(new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId())); // And send the cancellation request to a random node CancelTasksResponse response = testNodes[0].transportCancelTasksAction.execute(request).get(); logger.info("--> Done simulating issuing cancel request on the node that is about to leave the cluster"); @@ -354,7 +356,7 @@ public void onFailure(Throwable e) { // Make sure that tasks are no longer running try { ListTasksResponse listTasksResponse1 = testNodes[randomIntBetween(1, testNodes.length - 1)] - .transportListTasksAction.execute(new ListTasksRequest().parentNode(mainNode).taskId(mainTask.getId())).get(); + .transportListTasksAction.execute(new ListTasksRequest().taskId(new TaskId(mainNode, mainTask.getId()))).get(); assertEquals(0, listTasksResponse1.getTasks().size()); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index d35704a93530c..eaa3caf908497 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -110,7 +110,7 @@ public void testMasterNodeOperationTasks() { List tasks = findEvents(ClusterHealthAction.NAME, Tuple::v1); // Verify that one of these tasks is a parent of another task - if (tasks.get(0).getParentNode() == null) { + if (tasks.get(0).getParentTaskId().isSet()) { assertParentTask(Collections.singletonList(tasks.get(1)), tasks.get(0)); } else { assertParentTask(Collections.singletonList(tasks.get(0)), tasks.get(1)); @@ -227,7 +227,9 @@ public void testTransportBroadcastReplicationTasks() { } else { // A [s][r] level task should have a corresponding [s] level task on the a different node (where primary is located) sTask = findEvents(RefreshAction.NAME + "[s]", - event -> event.v1() && taskInfo.getParentNode().equals(event.v2().getNode().getId()) && taskInfo.getDescription().equals(event.v2().getDescription())); + event -> event.v1() && taskInfo.getParentTaskId().getNodeId().equals(event.v2().getNode().getId()) && taskInfo + .getDescription() + .equals(event.v2().getDescription())); } // There should be only one parent task assertEquals(1, sTask.size()); @@ -393,9 +395,10 @@ private List findEvents(String actionMasks, Function tasks, TaskInfo parentTask) { for (TaskInfo task : tasks) { - assertNotNull(task.getParentNode()); - assertEquals(parentTask.getNode().getId(), task.getParentNode()); - assertEquals(parentTask.getId(), task.getParentId()); + assertFalse(task.getParentTaskId().isSet()); + assertEquals(parentTask.getNode().getId(), task.getParentTaskId().getNodeId()); + assertTrue(Strings.hasLength(task.getParentTaskId().getNodeId())); + assertEquals(parentTask.getId(), task.getParentTaskId().getId()); } } } diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java index 5b3736de79372..0d4372a51eb3e 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java @@ -47,6 +47,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -84,8 +85,8 @@ static class TestTask extends CancellableTask { private volatile boolean blocked = true; - public TestTask(long id, String type, String action, String description, String parentNode, long parentId) { - super(id, type, action, description, parentNode, parentId); + public TestTask(long id, String type, String action, String description, TaskId parentTaskId) { + super(id, type, action, description, parentTaskId); } public boolean isBlocked() { @@ -172,8 +173,8 @@ public String getDescription() { } @Override - public Task createTask(long id, String type, String action, String parentTaskNode, long parentTaskId) { - return new TestTask(id, type, action, this.getDescription(), parentTaskNode, parentTaskId); + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return new TestTask(id, type, action, this.getDescription(), parentTaskId); } } diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java index 19fd017c3cc88..b4464dc9f58d5 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java @@ -43,13 +43,11 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.tasks.MockTaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.transport.local.LocalTransport; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; + import java.io.IOException; import java.util.ArrayList; @@ -103,9 +101,9 @@ public String getDescription() { } @Override - public Task createTask(long id, String type, String action, String parentTaskNode, long parentTaskId) { + public Task createTask(long id, String type, String action, TaskId parentTaskId) { if (enableTaskManager) { - return super.createTask(id, type, action, parentTaskNode, parentTaskId); + return super.createTask(id, type, action, parentTaskId); } else { return null; } @@ -313,7 +311,7 @@ protected NodeResponse nodeOperation(NodeRequest request) { } Task task = actions[0].execute(request, listener); logger.info("Awaiting for all actions to start"); - actionLatch.await(); + assertTrue(actionLatch.await(10, TimeUnit.SECONDS)); logger.info("Done waiting for all actions to start"); return task; } @@ -426,14 +424,13 @@ public void testFindChildTasks() throws Exception { // Find tasks with common parent listTasksRequest = new ListTasksRequest(); - listTasksRequest.parentNode(parentNode); - listTasksRequest.parentTaskId(parentTaskId); + listTasksRequest.parentTaskId(new TaskId(parentNode, parentTaskId)); response = testNode.transportListTasksAction.execute(listTasksRequest).get(); assertEquals(testNodes.length, response.getTasks().size()); for (TaskInfo task : response.getTasks()) { assertEquals("testAction[n]", task.getAction()); - assertEquals(parentNode, task.getParentNode()); - assertEquals(parentTaskId, task.getParentId()); + assertEquals(parentNode, task.getParentTaskId().getNodeId()); + assertEquals(parentTaskId, task.getParentTaskId().getId()); } // Release all tasks and wait for response @@ -514,7 +511,8 @@ public void onFailure(Throwable e) { String actionName = "testAction"; // only pick the main action // Try to cancel main task using action name - CancelTasksRequest request = new CancelTasksRequest(testNodes[0].discoveryNode.getId()); + CancelTasksRequest request = new CancelTasksRequest(); + request.nodesIds(testNodes[0].discoveryNode.getId()); request.reason("Testing Cancellation"); request.actions(actionName); CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request) @@ -527,9 +525,9 @@ public void onFailure(Throwable e) { // Try to cancel main task using id - request = new CancelTasksRequest(testNodes[0].discoveryNode.getId()); + request = new CancelTasksRequest(); request.reason("Testing Cancellation"); - request.taskId(task.getId()); + request.taskId(new TaskId(testNodes[0].discoveryNode.getId(), task.getId())); response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request).get(); // Shouldn't match any tasks since testAction doesn't support cancellation @@ -601,7 +599,7 @@ public void testTaskLevelActionFailures() throws ExecutionException, Interrupted @Override protected TestTaskResponse taskOperation(TestTasksRequest request, Task task) { logger.info("Task action on node " + node); - if (failTaskOnNode == node && task.getParentNode() != null) { + if (failTaskOnNode == node && task.getParentTaskId().isSet() == false) { logger.info("Failing on node " + node); throw new RuntimeException("Task level failure"); } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 1e320b8c3d1a8..3fc33477746b5 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -1015,7 +1015,7 @@ public void close() { * half the time. */ private ReplicationTask maybeTask() { - return random().nextBoolean() ? new ReplicationTask(0, null, null, null, null, 0) : null; + return random().nextBoolean() ? new ReplicationTask(0, null, null, null, null) : null; } /** diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.cancel.json b/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.cancel.json index f36157144e870..14ac986280036 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.cancel.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.cancel.json @@ -4,18 +4,18 @@ "methods": ["POST"], "url": { "path": "/_tasks", - "paths": ["/_tasks/_cancel", "/_tasks/{node_id}/_cancel", "/_tasks/{node_id}/{task_id}/_cancel"], + "paths": ["/_tasks/_cancel", "/_tasks/{task_id}/_cancel"], "parts": { - "node_id": { - "type": "list", - "description": "A comma-separated list of node IDs or names to limit the request; use `_local` to cancel only tasks on the node you're connecting to, leave empty to cancel tasks on all nodes" - }, "task_id": { "type": "number", "description": "Cancel the task with specified id" } }, "params": { + "node_id": { + "type": "list", + "description": "A comma-separated list of node IDs or names to limit the returned information; use `_local` to return information from the node you're connecting to, leave empty to get information from all nodes" + }, "actions": { "type": "list", "description": "A comma-separated list of actions that should be cancelled. Leave empty to cancel all." diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json b/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json index f44fa92f853b6..7e8683b3475f5 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/tasks.list.json @@ -4,18 +4,18 @@ "methods": ["GET"], "url": { "path": "/_tasks", - "paths": ["/_tasks", "/_tasks/{node_id}", "/_tasks/{node_id}/{task_id}"], + "paths": ["/_tasks", "/_tasks/{task_id}"], "parts": { - "node_id": { - "type": "list", - "description": "A comma-separated list of node IDs or names to limit the returned information; use `_local` to return information from the node you're connecting to, leave empty to get information from all nodes" - }, "task_id": { "type": "number", "description": "Return the task with specified id" } }, "params": { + "node_id": { + "type": "list", + "description": "A comma-separated list of node IDs or names to limit the returned information; use `_local` to return information from the node you're connecting to, leave empty to get information from all nodes" + }, "actions": { "type": "list", "description": "A comma-separated list of actions that should be returned. Leave empty to return all." diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.cancel/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.cancel/10_basic.yaml index 6d8d7a9a205fa..d65ee04211b9d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.cancel/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/tasks.cancel/10_basic.yaml @@ -2,7 +2,6 @@ "tasks_cancel test": - do: tasks.cancel: - node_id: _local - task_id: 1 + actions: "unknown_action" - length: { nodes: 0 }