Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rest High Level client: Add List Tasks #29546

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;

Expand Down Expand Up @@ -63,4 +65,26 @@ public void putSettingsAsync(ClusterUpdateSettingsRequest clusterUpdateSettingsR
restHighLevelClient.performRequestAsyncAndParseEntity(clusterUpdateSettingsRequest, Request::clusterPutSettings,
ClusterUpdateSettingsResponse::fromXContent, listener, emptySet(), headers);
}

/**
* Get current tasks using the Task Management API
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
*/
public ListTasksResponse listTasks(ListTasksRequest request, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, Request::listTasks, ListTasksResponse::fromXContent,
emptySet(), headers);
}

/**
* Asynchronously get current tasks using the Task Management API
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
*/
public void listTasksAsync(ListTasksRequest request, ActionListener<ListTasksResponse> listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity(request, Request::listTasks, ListTasksResponse::fromXContent,
listener, emptySet(), headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.http.entity.ContentType;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
Expand Down Expand Up @@ -77,6 +78,7 @@
import org.elasticsearch.index.rankeval.RankEvalRequest;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.tasks.TaskId;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -578,6 +580,21 @@ static Request clusterPutSettings(ClusterUpdateSettingsRequest clusterUpdateSett
return new Request(HttpPut.METHOD_NAME, "/_cluster/settings", parameters.getParams(), entity);
}

static Request listTasks(ListTasksRequest request) {
if (request.getTaskId() != null && request.getTaskId().isSet()) {
throw new IllegalArgumentException("TaskId cannot be used for list tasks request");
}
Params params = Params.builder()
.withTimeout(request.getTimeout())
.withDetailed(request.getDetailed())
.withWaitForCompletion(request.getWaitForCompletion())
.withParentTaskId(request.getParentTaskId())
.withNodes(request.getNodes())
.withActions(request.getActions())
.putParam("group_by", "none");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding: is "group_by" fixed here because the current implementation of ListTasksRequests doesn't prrovide it (I had a quick look and couldn't find anything)? If so, should we support it? When I look at the docs it seems like group_by=parents is supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To parse the response we only need group_by=none. Grouping by parent or nodes will be done on the client side when you call getTaskGroups() or getPerNodeTasks() in ListTasksResponse.
But, by default API returns group_by=parent result, so I added here specific parameter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clearing this up for me, makes sense.

return new Request(HttpGet.METHOD_NAME, "/_tasks", params.getParams(), null);
}

static Request rollover(RolloverRequest rolloverRequest) throws IOException {
Params params = Params.builder();
params.withTimeout(rolloverRequest.timeout());
Expand Down Expand Up @@ -843,6 +860,41 @@ Params withPreserveExisting(boolean preserveExisting) {
return this;
}

Params withDetailed(boolean detailed) {
if (detailed) {
return putParam("detailed", Boolean.TRUE.toString());
}
return this;
}

Params withWaitForCompletion(boolean waitForCompletion) {
if (waitForCompletion) {
return putParam("wait_for_completion", Boolean.TRUE.toString());
}
return this;
}

Params withNodes(String[] nodes) {
if (nodes != null && nodes.length > 0) {
return putParam("nodes", String.join(",", nodes));
}
return this;
}

Params withActions(String[] actions) {
if (actions != null && actions.length > 0) {
return putParam("actions", String.join(",", actions));
}
return this;
}

Params withParentTaskId(TaskId parentTaskId) {
if (parentTaskId != null && parentTaskId.isSet()) {
return putParam("parent_task_id", parentTaskId.toString());
}
return this;
}

Map<String, String> getParams() {
return Collections.unmodifiableMap(params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
package org.elasticsearch.client;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
Expand All @@ -29,13 +32,16 @@
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskInfo;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.emptyList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

Expand Down Expand Up @@ -105,4 +111,29 @@ public void testClusterUpdateSettingNonExistent() {
assertThat(exception.getMessage(), equalTo(
"Elasticsearch exception [type=illegal_argument_exception, reason=transient setting [" + setting + "], not recognized]"));
}

public void testListTasks() throws IOException {
ListTasksRequest request = new ListTasksRequest();
ListTasksResponse response = execute(request, highLevelClient().cluster()::listTasks, highLevelClient().cluster()::listTasksAsync);

assertThat(response, notNullValue());
assertThat(response.getNodeFailures(), equalTo(emptyList()));
assertThat(response.getTaskFailures(), equalTo(emptyList()));
// It's possible that there are other tasks except 'cluster:monitor/tasks/lists[n]' and 'action":"cluster:monitor/tasks/lists'
assertThat(response.getTasks().size(), greaterThanOrEqualTo(2));
boolean listTasksFound = false;
for (TaskGroup taskGroup : response.getTaskGroups()) {
TaskInfo parent = taskGroup.getTaskInfo();
if ("cluster:monitor/tasks/lists".equals(parent.getAction())) {
assertThat(taskGroup.getChildTasks().size(), equalTo(1));
TaskGroup childGroup = taskGroup.getChildTasks().iterator().next();
assertThat(childGroup.getChildTasks().isEmpty(), equalTo(true));
TaskInfo child = childGroup.getTaskInfo();
assertThat(child.getAction(), equalTo("cluster:monitor/tasks/lists[n]"));
assertThat(child.getParentTaskId(), equalTo(parent.getTaskId()));
listTasksFound = true;
}
}
assertTrue("List tasks were not found", listTasksFound);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
Expand Down Expand Up @@ -100,6 +101,7 @@
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.RandomObjects;

Expand Down Expand Up @@ -128,6 +130,7 @@
import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchRequest;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class RequestTests extends ESTestCase {
Expand Down Expand Up @@ -1364,6 +1367,66 @@ public void testIndexPutSettings() throws IOException {
assertEquals(expectedParams, request.getParameters());
}

public void testListTasks() {
{
ListTasksRequest request = new ListTasksRequest();
Map<String, String> expectedParams = new HashMap<>();
if (randomBoolean()) {
request.setDetailed(randomBoolean());
if (request.getDetailed()) {
expectedParams.put("detailed", "true");
}
}
if (randomBoolean()) {
request.setWaitForCompletion(randomBoolean());
if (request.getWaitForCompletion()) {
expectedParams.put("wait_for_completion", "true");
}
}
if (randomBoolean()) {
String timeout = randomTimeValue();
request.setTimeout(timeout);
expectedParams.put("timeout", timeout);
}
if (randomBoolean()) {
if (randomBoolean()) {
TaskId taskId = new TaskId(randomAlphaOfLength(5), randomNonNegativeLong());
request.setParentTaskId(taskId);
expectedParams.put("parent_task_id", taskId.toString());
} else {
request.setParentTask(TaskId.EMPTY_TASK_ID);
}
}
if (randomBoolean()) {
String[] nodes = generateRandomStringArray(10, 8, false);
request.setNodes(nodes);
if (nodes.length > 0) {
expectedParams.put("nodes", String.join(",", nodes));
}
}
if (randomBoolean()) {
String[] actions = generateRandomStringArray(10, 8, false);
request.setActions(actions);
if (actions.length > 0) {
expectedParams.put("actions", String.join(",", actions));
}
}
expectedParams.put("group_by", "none");
Request httpRequest = Request.listTasks(request);
assertThat(httpRequest, notNullValue());
assertThat(httpRequest.getMethod(), equalTo(HttpGet.METHOD_NAME));
assertThat(httpRequest.getEntity(), nullValue());
assertThat(httpRequest.getEndpoint(), equalTo("/_tasks"));
assertThat(httpRequest.getParameters(), equalTo(expectedParams));
}
{
ListTasksRequest request = new ListTasksRequest();
request.setTaskId(new TaskId(randomAlphaOfLength(5), randomNonNegativeLong()));
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> Request.listTasks(request));
assertEquals("TaskId cannot be used for list tasks request", exception.getMessage());
}
}

private static void assertToXContentBody(ToXContent expectedBody, HttpEntity actualEntity) throws IOException {
BytesReference expectedBytes = XContentHelper.toXContent(expectedBody, REQUEST_BODY_CONTENT_TYPE, false);
assertEquals(XContentType.JSON.mediaTypeWithoutParameters(), actualEntity.getContentType().getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@

package org.elasticsearch.client.documentation;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
Expand All @@ -31,14 +37,20 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;

/**
* This class is used to generate the Java Cluster API documentation.
Expand Down Expand Up @@ -177,4 +189,87 @@ public void onFailure(Exception e) {
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

public void testListTasks() throws IOException {
RestHighLevelClient client = highLevelClient();
{
// tag::list-tasks-request
ListTasksRequest request = new ListTasksRequest();
// end::list-tasks-request

// tag::list-tasks-request-filter
request.setActions("cluster:*"); // <1>
request.setNodes("nodeId1", "nodeId2"); // <2>
request.setParentTaskId(new TaskId("parentTaskId", 42)); // <3>
// end::list-tasks-request-filter

// tag::list-tasks-request-detailed
request.setDetailed(true); // <1>
// end::list-tasks-request-detailed

// tag::list-tasks-request-wait-completion
request.setWaitForCompletion(true); // <1>
request.setTimeout(TimeValue.timeValueSeconds(50)); // <2>
request.setTimeout("50s"); // <3>
// end::list-tasks-request-wait-completion
}

ListTasksRequest request = new ListTasksRequest();

// tag::list-tasks-execute
ListTasksResponse response = client.cluster().listTasks(request);
// end::list-tasks-execute

assertThat(response, notNullValue());

// tag::list-tasks-response-tasks
List<TaskInfo> tasks = response.getTasks(); // <1>
// end::list-tasks-response-tasks

// tag::list-tasks-response-calc
Map<String, List<TaskInfo>> perNodeTasks = response.getPerNodeTasks(); // <1>
List<TaskGroup> groups = response.getTaskGroups(); // <2>
// end::list-tasks-response-calc

// tag::list-tasks-response-failures
List<ElasticsearchException> nodeFailures = response.getNodeFailures(); // <1>
List<TaskOperationFailure> taskFailures = response.getTaskFailures(); // <2>
// end::list-tasks-response-failures

assertThat(response.getNodeFailures(), equalTo(emptyList()));
assertThat(response.getTaskFailures(), equalTo(emptyList()));
assertThat(response.getTasks().size(), greaterThanOrEqualTo(2));
}

public void testListTasksAsync() throws Exception {
RestHighLevelClient client = highLevelClient();
{
ListTasksRequest request = new ListTasksRequest();

// tag::list-tasks-execute-listener
ActionListener<ListTasksResponse> listener =
new ActionListener<ListTasksResponse>() {
@Override
public void onResponse(ListTasksResponse response) {
// <1>
}

@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::list-tasks-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);

// tag::list-tasks-execute-async
client.cluster().listTasksAsync(request, listener); // <1>
// end::list-tasks-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
}
Loading