diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 7c395365c1b88..9e064c3d20924 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -19,7 +19,6 @@ package org.elasticsearch.persistent; -import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; @@ -33,9 +32,9 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.tasks.Task; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.tasks.Task; import java.util.Objects; @@ -52,29 +51,31 @@ public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorR this.clusterService = clusterService; clusterService.addListener(this); this.registry = registry; - } /** * Creates a new persistent task on master node * - * @param action the action name - * @param params params - * @param listener the listener that will be called when task is started + * @param taskId the task's id + * @param taskName the task's name + * @param taskParams the task's parameters + * @param listener the listener that will be called when task is started */ - public void createPersistentTask(String taskId, String action, @Nullable Params params, + public void createPersistentTask(String taskId, String taskName, @Nullable Params taskParams, ActionListener> listener) { clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() { @Override - public ClusterState execute(ClusterState currentState) throws Exception { + public ClusterState execute(ClusterState currentState) { PersistentTasksCustomMetaData.Builder builder = builder(currentState); if (builder.hasTask(taskId)) { throw new ResourceAlreadyExistsException("task with id {" + taskId + "} already exist"); } - validate(action, currentState, params); - final Assignment assignment; - assignment = getAssignement(action, currentState, params); - return update(currentState, builder.addTask(taskId, action, params, assignment)); + + PersistentTasksExecutor taskExecutor = registry.getPersistentTaskExecutorSafe(taskName); + taskExecutor.validate(taskParams, currentState); + + Assignment assignment = createAssignment(taskName, taskParams, currentState); + return update(currentState, builder.addTask(taskId, taskName, taskParams, assignment)); } @Override @@ -95,7 +96,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } - /** * Restarts a record about a running persistent task from cluster state * @@ -114,7 +114,7 @@ public void completePersistentTask(String id, long allocationId, Exception failu } clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { @Override - public ClusterState execute(ClusterState currentState) throws Exception { + public ClusterState execute(ClusterState currentState) { PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); if (tasksInProgress.hasTask(id, allocationId)) { tasksInProgress.removeTask(id); @@ -185,7 +185,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS public void updatePersistentTaskStatus(String id, long allocationId, Task.Status status, ActionListener> listener) { clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() { @Override - public ClusterState execute(ClusterState currentState) throws Exception { + public ClusterState execute(ClusterState currentState) { PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); if (tasksInProgress.hasTask(id, allocationId)) { return update(currentState, tasksInProgress.updateTaskStatus(id, status)); @@ -211,93 +211,85 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } - private Assignment getAssignement(String taskName, ClusterState currentState, - @Nullable Params params) { - PersistentTasksExecutor persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); - return persistentTasksExecutor.getAssignment(params, currentState); - } + /** + * Creates a new {@link Assignment} for the given persistent task. + * + * @param taskName the task's name + * @param taskParams the task's parameters + * @param currentState the current {@link ClusterState} - private void validate(String taskName, ClusterState currentState, @Nullable Params params) { + * @return a new {@link Assignment} + */ + private Assignment createAssignment(final String taskName, + final @Nullable Params taskParams, + final ClusterState currentState) { PersistentTasksExecutor persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); - persistentTasksExecutor.validate(params, currentState); + return persistentTasksExecutor.getAssignment(taskParams, currentState); } @Override public void clusterChanged(ClusterChangedEvent event) { if (event.localNodeMaster()) { - logger.trace("checking task reassignment for cluster state {}", event.state().getVersion()); - if (reassignmentRequired(event, this::getAssignement)) { - logger.trace("task reassignment is needed"); - reassignTasks(); - } else { - logger.trace("task reassignment is not needed"); + if (shouldReassignPersistentTasks(event)) { + logger.trace("checking task reassignment for cluster state {}", event.state().getVersion()); + clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return reassignTasks(currentState); + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn("failed to reassign persistent tasks", e); + } + }); } } } - interface ExecutorNodeDecider { - Assignment getAssignment(String action, ClusterState currentState, Params params); - } + /** + * Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following + * situations: a node left or is added, the routing table changed, the master node changed or the persistent tasks changed. + */ + boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) { + final PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (tasks == null) { + return false; + } - static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) { - PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - PersistentTasksCustomMetaData prevTasks = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (tasks != null && (Objects.equals(tasks, prevTasks) == false || - event.nodesChanged() || - event.routingTableChanged() || - event.previousState().nodes().isLocalNodeElectedMaster() == false)) { - // We need to check if removed nodes were running any of the tasks and reassign them - boolean reassignmentRequired = false; - for (PersistentTask taskInProgress : tasks.tasks()) { - if (taskInProgress.needsReassignment(event.state().nodes())) { - // there is an unassigned task or task with a disappeared node - we need to try assigning it - if (Objects.equals(taskInProgress.getAssignment(), - decider.getAssignment(taskInProgress.getTaskName(), event.state(), taskInProgress.getParams())) == false) { - // it looks like a assignment for at least one task is possible - let's trigger reassignment - reassignmentRequired = true; - break; - } + boolean masterChanged = event.previousState().nodes().isLocalNodeElectedMaster() == false; + if (persistentTasksChanged(event) || event.nodesChanged() || event.routingTableChanged() || masterChanged) { + for (PersistentTask task : tasks.tasks()) { + if (needsReassignment(task.getAssignment(), event.state().nodes())) { + Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state()); + if (Objects.equals(assignment, task.getAssignment()) == false) { + return true; + } } } - return reassignmentRequired; } return false; } /** - * Evaluates the cluster state and tries to assign tasks to nodes + * Evaluates the cluster state and tries to assign tasks to nodes. + * + * @param currentState the cluster state to analyze + * @return an updated version of the cluster state */ - public void reassignTasks() { - clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return reassignTasks(currentState, logger, PersistentTasksClusterService.this::getAssignement); - } - - @Override - public void onFailure(String source, Exception e) { - logger.warn("Unsuccessful persistent task reassignment", e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - - } - }); - } - - static ClusterState reassignTasks(ClusterState currentState, Logger logger, ExecutorNodeDecider decider) { - PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + ClusterState reassignTasks(final ClusterState currentState) { ClusterState clusterState = currentState; - DiscoveryNodes nodes = currentState.nodes(); + + final PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); if (tasks != null) { logger.trace("reassigning {} persistent tasks", tasks.tasks().size()); + final DiscoveryNodes nodes = currentState.nodes(); + // We need to check if removed nodes were running any of the tasks and reassign them for (PersistentTask task : tasks.tasks()) { - if (task.needsReassignment(nodes)) { - // there is an unassigned task - we need to try assigning it - Assignment assignment = decider.getAssignment(task.getTaskName(), clusterState, task.getParams()); + if (needsReassignment(task.getAssignment(), nodes)) { + Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState); if (Objects.equals(assignment, task.getAssignment()) == false) { logger.trace("reassigning task {} from node {} to node {}", task.getId(), task.getAssignment().getExecutorNode(), assignment.getExecutorNode()); @@ -313,6 +305,17 @@ static ClusterState reassignTasks(ClusterState currentState, Logger logger, Exec return clusterState; } + /** Returns true if the persistent tasks are not equal between the previous and the current cluster state **/ + static boolean persistentTasksChanged(final ClusterChangedEvent event) { + String type = PersistentTasksCustomMetaData.TYPE; + return Objects.equals(event.state().metaData().custom(type), event.previousState().metaData().custom(type)) == false; + } + + /** Returns true if the task is not assigned or is assigned to a non-existing node */ + public static boolean needsReassignment(final Assignment assignment, final DiscoveryNodes nodes) { + return (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false); + } + private static PersistentTasksCustomMetaData.Builder builder(ClusterState currentState) { return PersistentTasksCustomMetaData.builder(currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java index ee45eb8ffad28..6611ff7f2a3cc 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java @@ -145,7 +145,6 @@ private TaskDescriptionBuilder setStatus(Status status) { } } - public Collection> tasks() { return this.tasks.values(); } @@ -165,12 +164,6 @@ public Collection> findTasks(String taskName, Predicate> predicate) { - return this.tasks().stream() - .filter(p -> taskName.equals(p.getTaskName())) - .anyMatch(predicate); - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -279,7 +272,6 @@ public static class PersistentTask

implements Wr @Nullable private final Long allocationIdOnLastStatusUpdate; - public PersistentTask(String id, String taskName, P params, long allocationId, Assignment assignment) { this(id, allocationId, taskName, params, null, assignment, null); } @@ -395,13 +387,6 @@ public boolean isAssigned() { return assignment.isAssigned(); } - /** - * Returns true if the tasks is not stopped and unassigned or assigned to a non-existing node. - */ - public boolean needsReassignment(DiscoveryNodes nodes) { - return (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false); - } - @Nullable public Status getStatus() { return status; @@ -522,16 +507,14 @@ public static NamedDiff readDiffFrom(StreamInput in) throws IOE return readDiffFrom(MetaData.Custom.class, TYPE, in); } - public long getLastAllocationId() { - return lastAllocationId; - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("last_allocation_id", lastAllocationId); builder.startArray("tasks"); - for (PersistentTask entry : tasks.values()) { - entry.toXContent(builder, params); + { + for (PersistentTask entry : tasks.values()) { + entry.toXContent(builder, params); + } } builder.endArray(); return builder; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java index ed61ad5805391..0a1e2095934ef 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java @@ -95,9 +95,7 @@ protected DiscoveryNode selectLeastLoadedNode(ClusterState clusterState, Predica *

* Throws an exception if the supplied params cannot be executed on the cluster in the current state. */ - public void validate(Params params, ClusterState clusterState) { - - } + public void validate(Params params, ClusterState clusterState) {} /** * Creates a AllocatedPersistentTask for communicating with task manager diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index 1169ff91e1308..e470c5028aa8f 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -29,31 +29,42 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.VersionUtils; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.function.BiFunction; import static java.util.Collections.emptyMap; +import static java.util.Collections.singleton; +import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment; +import static org.elasticsearch.persistent.PersistentTasksClusterService.persistentTasksChanged; import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; public class PersistentTasksClusterServiceTests extends ESTestCase { public void testReassignmentRequired() { + final PersistentTasksClusterService service = createService((params, clusterState) -> + "never_assign".equals(((TestParams) params).getTestParam()) ? NO_NODE_FOUND : randomNodeAssignment(clusterState.nodes()) + ); + int numberOfIterations = randomIntBetween(1, 30); ClusterState clusterState = initialState(); for (int i = 0; i < numberOfIterations; i++) { @@ -66,17 +77,7 @@ public void testReassignmentRequired() { clusterState = insignificantChange(clusterState); } ClusterChangedEvent event = new ClusterChangedEvent("test", clusterState, previousState); - assertThat(dumpEvent(event), PersistentTasksClusterService.reassignmentRequired(event, - new PersistentTasksClusterService.ExecutorNodeDecider() { - @Override - public Assignment getAssignment( - String action, ClusterState currentState, Params params) { - if ("never_assign".equals(((TestParams) params).getTestParam())) { - return NO_NODE_FOUND; - } - return randomNodeAssignment(currentState.nodes()); - } - }), equalTo(significant)); + assertThat(dumpEvent(event), service.shouldReassignPersistentTasks(event), equalTo(significant)); } } @@ -175,6 +176,115 @@ public void testReassignTasks() { } } + public void testPersistentTasksChangedNoTasks() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT)) + .build(); + + ClusterState previous = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .build(); + ClusterState current = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .build(); + + assertFalse("persistent tasks unchanged (no tasks)", + persistentTasksChanged(new ClusterChangedEvent("test", current, previous))); + } + + public void testPersistentTasksChangedTaskAdded() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT)) + .build(); + + ClusterState previous = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .build(); + + PersistentTasksCustomMetaData tasks = PersistentTasksCustomMetaData.builder() + .addTask("_task_1", "test", null, new Assignment(null, "_reason")) + .build(); + + ClusterState current = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasks)) + .build(); + + assertTrue("persistent tasks changed (task added)", + persistentTasksChanged(new ClusterChangedEvent("test", current, previous))); + } + + public void testPersistentTasksChangedTaskRemoved() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT)) + .add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT)) + .build(); + + PersistentTasksCustomMetaData previousTasks = PersistentTasksCustomMetaData.builder() + .addTask("_task_1", "test", null, new Assignment("_node_1", "_reason")) + .addTask("_task_2", "test", null, new Assignment("_node_1", "_reason")) + .addTask("_task_3", "test", null, new Assignment("_node_2", "_reason")) + .build(); + + ClusterState previous = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, previousTasks)) + .build(); + + PersistentTasksCustomMetaData currentTasks = PersistentTasksCustomMetaData.builder() + .addTask("_task_1", "test", null, new Assignment("_node_1", "_reason")) + .addTask("_task_3", "test", null, new Assignment("_node_2", "_reason")) + .build(); + + ClusterState current = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, currentTasks)) + .build(); + + assertTrue("persistent tasks changed (task removed)", + persistentTasksChanged(new ClusterChangedEvent("test", current, previous))); + } + + public void testPersistentTasksAssigned() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT)) + .add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT)) + .build(); + + PersistentTasksCustomMetaData previousTasks = PersistentTasksCustomMetaData.builder() + .addTask("_task_1", "test", null, new Assignment("_node_1", "")) + .addTask("_task_2", "test", null, new Assignment(null, "unassigned")) + .build(); + + ClusterState previous = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, previousTasks)) + .build(); + + PersistentTasksCustomMetaData currentTasks = PersistentTasksCustomMetaData.builder() + .addTask("_task_1", "test", null, new Assignment("_node_1", "")) + .addTask("_task_2", "test", null, new Assignment("_node_2", "")) + .build(); + + ClusterState current = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, currentTasks)) + .build(); + + assertTrue("persistent tasks changed (task assigned)", + persistentTasksChanged(new ClusterChangedEvent("test", current, previous))); + } + + public void testNeedsReassignment() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT)) + .add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT)) + .build(); + + assertTrue(needsReassignment(new Assignment(null, "unassigned"), nodes)); + assertTrue(needsReassignment(new Assignment("_node_left", "assigned to a node that left"), nodes)); + assertFalse(needsReassignment(new Assignment("_node_1", "assigned"), nodes)); + } private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) { for (int i = 0; i < nonLocalNodesCount; i++) { @@ -183,29 +293,25 @@ private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) } private ClusterState reassign(ClusterState clusterState) { - return PersistentTasksClusterService.reassignTasks(clusterState, logger, - new PersistentTasksClusterService.ExecutorNodeDecider() { - @Override - public Assignment getAssignment( - String action, ClusterState currentState, Params params) { - TestParams testParams = (TestParams) params; - switch (testParams.getTestParam()) { - case "assign_me": - return randomNodeAssignment(currentState.nodes()); - case "dont_assign_me": - return NO_NODE_FOUND; - case "fail_me_if_called": - fail("the decision decider shouldn't be called on this task"); - return null; - case "assign_one": - return assignOnlyOneTaskAtATime(currentState); - default: - fail("unknown param " + testParams.getTestParam()); - } - return NO_NODE_FOUND; - } - }); + PersistentTasksClusterService service = createService((params, currentState) -> { + TestParams testParams = (TestParams) params; + switch (testParams.getTestParam()) { + case "assign_me": + return randomNodeAssignment(currentState.nodes()); + case "dont_assign_me": + return NO_NODE_FOUND; + case "fail_me_if_called": + fail("the decision decider shouldn't be called on this task"); + return null; + case "assign_one": + return assignOnlyOneTaskAtATime(currentState); + default: + fail("unknown param " + testParams.getTestParam()); + } + return NO_NODE_FOUND; + }); + return service.reassignTasks(clusterState); } private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) { @@ -450,4 +556,21 @@ private void changeRoutingTable(MetaData.Builder metaData, RoutingTable.Builder metaData.put(indexMetaData, false); routingTable.addAsNew(indexMetaData); } + + /** Creates a PersistentTasksClusterService with a single PersistentTasksExecutor implemented by a BiFunction **/ + static

PersistentTasksClusterService createService(final BiFunction fn) { + PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, + singleton(new PersistentTasksExecutor

(Settings.EMPTY, TestPersistentTasksExecutor.NAME, null) { + @Override + public Assignment getAssignment(P params, ClusterState clusterState) { + return fn.apply(params, clusterState); + } + + @Override + protected void nodeOperation(AllocatedPersistentTask task, P params, Task.Status status) { + throw new UnsupportedOperationException(); + } + })); + return new PersistentTasksClusterService(Settings.EMPTY, registry, mock(ClusterService.class)); + } }