Skip to content

Commit

Permalink
Fix CancellableTasksIT (#55198)
Browse files Browse the repository at this point in the history
We need to ensure all ban parents are removed after each test; 
otherwise, the subsequent tests can fail because of the leftover.

Closes #55106
  • Loading branch information
dnhatn authored Apr 15, 2020
1 parent 0e6b4c4 commit da8f411
Showing 1 changed file with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.elasticsearch.action.admin.cluster.node.tasks;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionFuture;
Expand Down Expand Up @@ -68,7 +67,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand All @@ -79,7 +78,6 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55106")
public class CancellableTasksIT extends ESIntegTestCase {

static int idGenerator = 0;
Expand Down Expand Up @@ -163,6 +161,15 @@ static void allowEntireRequest(TestRequest request) {
}
}

void ensureAllBansRemoved() throws Exception {
assertBusy(() -> {
for (String node : internalCluster().getNodeNames()) {
TaskManager taskManager = internalCluster().getInstance(TransportService.class, node).getTaskManager();
assertThat("node " + node, taskManager.getBannedTaskIds(), empty());
}
}, 30, TimeUnit.SECONDS);
}

public void testBanOnlyNodesWithOutstandingDescendantTasks() throws Exception {
if (randomBoolean()) {
internalCluster().startNodes(randomIntBetween(1, 3));
Expand Down Expand Up @@ -198,16 +205,11 @@ public void testBanOnlyNodesWithOutstandingDescendantTasks() throws Exception {
}
assertThat(taskManager.getBannedTaskIds(), equalTo(expectedBans));
}
});
}, 30, TimeUnit.SECONDS);
allowEntireRequest(rootRequest);
cancelFuture.actionGet();
waitForRootTask(rootTaskFuture);
assertBusy(() -> {
for (DiscoveryNode node : nodes) {
TaskManager taskManager = internalCluster().getInstance(TransportService.class, node.getName()).getTaskManager();
assertThat(taskManager.getBanCount(), equalTo(0));
}
});
ensureAllBansRemoved();
}

public void testCancelTaskMultipleTimes() throws Exception {
Expand All @@ -231,6 +233,7 @@ public void testCancelTaskMultipleTimes() throws Exception {
assertThat(cancelError.getNodeFailures(), hasSize(1));
final Throwable notFound = ExceptionsHelper.unwrap(cancelError.getNodeFailures().get(0), ResourceNotFoundException.class);
assertThat(notFound.getMessage(), equalTo("task [" + taskId + "] is not found"));
ensureAllBansRemoved();
}

public void testDoNotWaitForCompletion() throws Exception {
Expand All @@ -251,9 +254,11 @@ public void testDoNotWaitForCompletion() throws Exception {
}
allowEntireRequest(rootRequest);
waitForRootTask(mainTaskFuture);
cancelFuture.actionGet();
ensureAllBansRemoved();
}

public void testFailedToStartChildTaskAfterCancelled() {
public void testFailedToStartChildTaskAfterCancelled() throws Exception {
Set<DiscoveryNode> nodes = StreamSupport.stream(clusterService().state().nodes().spliterator(), false).collect(Collectors.toSet());
TestRequest rootRequest = generateTestRequest(nodes, 0, between(1, 3));
ActionFuture<TestResponse> rootTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest);
Expand All @@ -270,6 +275,7 @@ public void testFailedToStartChildTaskAfterCancelled() {
assertThat(te.getCause().getMessage(), equalTo("The parent task was cancelled, shouldn't start any child tasks"));
allowEntireRequest(rootRequest);
waitForRootTask(rootTaskFuture);
ensureAllBansRemoved();
}

static TaskId getRootTaskId(TestRequest request) {
Expand Down Expand Up @@ -384,7 +390,6 @@ public void writeTo(StreamOutput out) throws IOException {

public static class TransportTestAction extends HandledTransportAction<TestRequest, TestResponse> {

static AtomicInteger counter = new AtomicInteger();
public static ActionType<TestResponse> ACTION = new ActionType<>("internal::test_action", TestResponse::new);
private final TransportService transportService;
private final NodeClient client;
Expand All @@ -407,7 +412,6 @@ protected void doExecute(Task task, TestRequest request, ActionListener<TestResp
if (((CancellableTask) task).isCancelled()) {
throw new TaskCancelledException("Task was cancelled while executing");
}
counter.incrementAndGet();
return new TestResponse();
}));
for (TestRequest subRequest : subRequests) {
Expand All @@ -423,7 +427,7 @@ protected void startSubTask(TaskId parentTaskId, TestRequest subRequest, ActionL
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
latchedListener.onFailure(e);
}

@Override
Expand Down

0 comments on commit da8f411

Please sign in to comment.