From 3903f42b10ecb90e7fad3237cebe1e12a9c6e859 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 16 Mar 2018 13:42:06 +0100 Subject: [PATCH] Small code cleanups and refactorings in persistent tasks This commit consists of small code cleanups and refactorings in the persistent tasks framework. Most changes are in PersistentTasksClusterService where some methods have been renamed or merged together, documentation has been added, unused code removed in order to improve readability of the code. --- .../PersistentTasksClusterService.java | 155 +++++++------- .../PersistentTasksCustomMetaData.java | 25 +-- .../persistent/PersistentTasksExecutor.java | 4 +- .../PersistentTasksClusterServiceTests.java | 193 ++++++++++++++---- 4 files changed, 242 insertions(+), 135 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 7c395365c1b88..83bd1f4ca5b2a 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -19,7 +19,6 @@ package org.elasticsearch.persistent; -import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; @@ -33,9 +32,9 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.tasks.Task; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.tasks.Task; import java.util.Objects; @@ -52,29 +51,31 @@ public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorR this.clusterService = clusterService; clusterService.addListener(this); this.registry = registry; - } /** * Creates a new persistent task on master node * - * @param action the action name - * @param params params - * @param listener the listener that will be called when task is started + * @param taskId the task's id + * @param taskName the task's name + * @param taskParams the task's parameters + * @param listener the listener that will be called when task is started */ - public void createPersistentTask(String taskId, String action, @Nullable Params params, + public void createPersistentTask(String taskId, String taskName, @Nullable Params taskParams, ActionListener> listener) { clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() { @Override - public ClusterState execute(ClusterState currentState) throws Exception { + public ClusterState execute(ClusterState currentState) { PersistentTasksCustomMetaData.Builder builder = builder(currentState); if (builder.hasTask(taskId)) { throw new ResourceAlreadyExistsException("task with id {" + taskId + "} already exist"); } - validate(action, currentState, params); - final Assignment assignment; - assignment = getAssignement(action, currentState, params); - return update(currentState, builder.addTask(taskId, action, params, assignment)); + + PersistentTasksExecutor taskExecutor = registry.getPersistentTaskExecutorSafe(taskName); + taskExecutor.validate(taskParams, currentState); + + Assignment assignment = createAssignment(taskName, taskParams, currentState); + return update(currentState, builder.addTask(taskId, taskName, taskParams, assignment)); } @Override @@ -95,7 +96,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } - /** * Restarts a record about a running persistent task from cluster state * @@ -114,7 +114,7 @@ public void completePersistentTask(String id, long allocationId, Exception failu } clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { @Override - public ClusterState execute(ClusterState currentState) throws Exception { + public ClusterState execute(ClusterState currentState) { PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); if (tasksInProgress.hasTask(id, allocationId)) { tasksInProgress.removeTask(id); @@ -185,7 +185,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS public void updatePersistentTaskStatus(String id, long allocationId, Task.Status status, ActionListener> listener) { clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() { @Override - public ClusterState execute(ClusterState currentState) throws Exception { + public ClusterState execute(ClusterState currentState) { PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); if (tasksInProgress.hasTask(id, allocationId)) { return update(currentState, tasksInProgress.updateTaskStatus(id, status)); @@ -211,93 +211,85 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } - private Assignment getAssignement(String taskName, ClusterState currentState, - @Nullable Params params) { - PersistentTasksExecutor persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); - return persistentTasksExecutor.getAssignment(params, currentState); - } + /** + * Creates a new {@link Assignment} for the given persistent task. + * + * @param taskName the task's name + * @param taskParams the task's parameters + * @param currentState the current {@link ClusterState} - private void validate(String taskName, ClusterState currentState, @Nullable Params params) { + * @return a new {@link Assignment} + */ + private Assignment createAssignment(final String taskName, + final @Nullable Params taskParams, + final ClusterState currentState) { PersistentTasksExecutor persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); - persistentTasksExecutor.validate(params, currentState); + return persistentTasksExecutor.getAssignment(taskParams, currentState); } @Override public void clusterChanged(ClusterChangedEvent event) { if (event.localNodeMaster()) { - logger.trace("checking task reassignment for cluster state {}", event.state().getVersion()); - if (reassignmentRequired(event, this::getAssignement)) { - logger.trace("task reassignment is needed"); - reassignTasks(); - } else { - logger.trace("task reassignment is not needed"); + if (shouldReassignPersistentTasks(event)) { + logger.trace("checking task reassignment for cluster state {}", event.state().getVersion()); + clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return reassignTasks(currentState); + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn("failed to reassign persistent tasks", e); + } + }); } } } - interface ExecutorNodeDecider { - Assignment getAssignment(String action, ClusterState currentState, Params params); - } + /** + * Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following + * situations: a node left or is added, the routing table changed, the master node changed or the persistent tasks changed. + */ + boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) { + final PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (tasks == null) { + return false; + } - static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) { - PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - PersistentTasksCustomMetaData prevTasks = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (tasks != null && (Objects.equals(tasks, prevTasks) == false || - event.nodesChanged() || - event.routingTableChanged() || - event.previousState().nodes().isLocalNodeElectedMaster() == false)) { - // We need to check if removed nodes were running any of the tasks and reassign them - boolean reassignmentRequired = false; - for (PersistentTask taskInProgress : tasks.tasks()) { - if (taskInProgress.needsReassignment(event.state().nodes())) { - // there is an unassigned task or task with a disappeared node - we need to try assigning it - if (Objects.equals(taskInProgress.getAssignment(), - decider.getAssignment(taskInProgress.getTaskName(), event.state(), taskInProgress.getParams())) == false) { - // it looks like a assignment for at least one task is possible - let's trigger reassignment - reassignmentRequired = true; - break; - } + boolean masterChanged = event.previousState().nodes().isLocalNodeElectedMaster() == false; + if (persistentTasksChanged(event) || event.nodesChanged() || event.routingTableChanged() || masterChanged) { + for (PersistentTask task : tasks.tasks()) { + if (needsReassignment(task.getAssignment(), event.state().nodes())) { + Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state()); + if (Objects.equals(assignment, task.getAssignment()) == false) { + return true; + } } } - return reassignmentRequired; } return false; } /** - * Evaluates the cluster state and tries to assign tasks to nodes + * Evaluates the cluster state and tries to assign tasks to nodes. + * + * @param currentState the cluster state to analyze + * @return an updated version of the cluster state */ - public void reassignTasks() { - clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return reassignTasks(currentState, logger, PersistentTasksClusterService.this::getAssignement); - } - - @Override - public void onFailure(String source, Exception e) { - logger.warn("Unsuccessful persistent task reassignment", e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - - } - }); - } - - static ClusterState reassignTasks(ClusterState currentState, Logger logger, ExecutorNodeDecider decider) { - PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + ClusterState reassignTasks(final ClusterState currentState) { ClusterState clusterState = currentState; - DiscoveryNodes nodes = currentState.nodes(); + + final PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); if (tasks != null) { logger.trace("reassigning {} persistent tasks", tasks.tasks().size()); + final DiscoveryNodes nodes = currentState.nodes(); + // We need to check if removed nodes were running any of the tasks and reassign them for (PersistentTask task : tasks.tasks()) { - if (task.needsReassignment(nodes)) { - // there is an unassigned task - we need to try assigning it - Assignment assignment = decider.getAssignment(task.getTaskName(), clusterState, task.getParams()); + if (needsReassignment(task.getAssignment(), nodes)) { + Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState); if (Objects.equals(assignment, task.getAssignment()) == false) { logger.trace("reassigning task {} from node {} to node {}", task.getId(), task.getAssignment().getExecutorNode(), assignment.getExecutorNode()); @@ -313,6 +305,17 @@ static ClusterState reassignTasks(ClusterState currentState, Logger logger, Exec return clusterState; } + /** Returns true if the persistent tasks are not equal between the previous and the current cluster state **/ + static boolean persistentTasksChanged(final ClusterChangedEvent event) { + String type = PersistentTasksCustomMetaData.TYPE; + return Objects.equals(event.state().metaData().custom(type), event.previousState().metaData().custom(type)) == false; + } + + /** Returns true if the task is not assigned or is assigned to a non-existing node */ + static boolean needsReassignment(final Assignment assignment, final DiscoveryNodes nodes) { + return (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false); + } + private static PersistentTasksCustomMetaData.Builder builder(ClusterState currentState) { return PersistentTasksCustomMetaData.builder(currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java index ee45eb8ffad28..6611ff7f2a3cc 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java @@ -145,7 +145,6 @@ private TaskDescriptionBuilder setStatus(Status status) { } } - public Collection> tasks() { return this.tasks.values(); } @@ -165,12 +164,6 @@ public Collection> findTasks(String taskName, Predicate> predicate) { - return this.tasks().stream() - .filter(p -> taskName.equals(p.getTaskName())) - .anyMatch(predicate); - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -279,7 +272,6 @@ public static class PersistentTask

implements Wr @Nullable private final Long allocationIdOnLastStatusUpdate; - public PersistentTask(String id, String taskName, P params, long allocationId, Assignment assignment) { this(id, allocationId, taskName, params, null, assignment, null); } @@ -395,13 +387,6 @@ public boolean isAssigned() { return assignment.isAssigned(); } - /** - * Returns true if the tasks is not stopped and unassigned or assigned to a non-existing node. - */ - public boolean needsReassignment(DiscoveryNodes nodes) { - return (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false); - } - @Nullable public Status getStatus() { return status; @@ -522,16 +507,14 @@ public static NamedDiff readDiffFrom(StreamInput in) throws IOE return readDiffFrom(MetaData.Custom.class, TYPE, in); } - public long getLastAllocationId() { - return lastAllocationId; - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("last_allocation_id", lastAllocationId); builder.startArray("tasks"); - for (PersistentTask entry : tasks.values()) { - entry.toXContent(builder, params); + { + for (PersistentTask entry : tasks.values()) { + entry.toXContent(builder, params); + } } builder.endArray(); return builder; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java index ed61ad5805391..0a1e2095934ef 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java @@ -95,9 +95,7 @@ protected DiscoveryNode selectLeastLoadedNode(ClusterState clusterState, Predica *

* Throws an exception if the supplied params cannot be executed on the cluster in the current state. */ - public void validate(Params params, ClusterState clusterState) { - - } + public void validate(Params params, ClusterState clusterState) {} /** * Creates a AllocatedPersistentTask for communicating with task manager diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index 1169ff91e1308..e470c5028aa8f 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -29,31 +29,42 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.VersionUtils; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.function.BiFunction; import static java.util.Collections.emptyMap; +import static java.util.Collections.singleton; +import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment; +import static org.elasticsearch.persistent.PersistentTasksClusterService.persistentTasksChanged; import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; public class PersistentTasksClusterServiceTests extends ESTestCase { public void testReassignmentRequired() { + final PersistentTasksClusterService service = createService((params, clusterState) -> + "never_assign".equals(((TestParams) params).getTestParam()) ? NO_NODE_FOUND : randomNodeAssignment(clusterState.nodes()) + ); + int numberOfIterations = randomIntBetween(1, 30); ClusterState clusterState = initialState(); for (int i = 0; i < numberOfIterations; i++) { @@ -66,17 +77,7 @@ public void testReassignmentRequired() { clusterState = insignificantChange(clusterState); } ClusterChangedEvent event = new ClusterChangedEvent("test", clusterState, previousState); - assertThat(dumpEvent(event), PersistentTasksClusterService.reassignmentRequired(event, - new PersistentTasksClusterService.ExecutorNodeDecider() { - @Override - public Assignment getAssignment( - String action, ClusterState currentState, Params params) { - if ("never_assign".equals(((TestParams) params).getTestParam())) { - return NO_NODE_FOUND; - } - return randomNodeAssignment(currentState.nodes()); - } - }), equalTo(significant)); + assertThat(dumpEvent(event), service.shouldReassignPersistentTasks(event), equalTo(significant)); } } @@ -175,6 +176,115 @@ public void testReassignTasks() { } } + public void testPersistentTasksChangedNoTasks() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT)) + .build(); + + ClusterState previous = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .build(); + ClusterState current = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .build(); + + assertFalse("persistent tasks unchanged (no tasks)", + persistentTasksChanged(new ClusterChangedEvent("test", current, previous))); + } + + public void testPersistentTasksChangedTaskAdded() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT)) + .build(); + + ClusterState previous = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .build(); + + PersistentTasksCustomMetaData tasks = PersistentTasksCustomMetaData.builder() + .addTask("_task_1", "test", null, new Assignment(null, "_reason")) + .build(); + + ClusterState current = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasks)) + .build(); + + assertTrue("persistent tasks changed (task added)", + persistentTasksChanged(new ClusterChangedEvent("test", current, previous))); + } + + public void testPersistentTasksChangedTaskRemoved() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT)) + .add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT)) + .build(); + + PersistentTasksCustomMetaData previousTasks = PersistentTasksCustomMetaData.builder() + .addTask("_task_1", "test", null, new Assignment("_node_1", "_reason")) + .addTask("_task_2", "test", null, new Assignment("_node_1", "_reason")) + .addTask("_task_3", "test", null, new Assignment("_node_2", "_reason")) + .build(); + + ClusterState previous = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, previousTasks)) + .build(); + + PersistentTasksCustomMetaData currentTasks = PersistentTasksCustomMetaData.builder() + .addTask("_task_1", "test", null, new Assignment("_node_1", "_reason")) + .addTask("_task_3", "test", null, new Assignment("_node_2", "_reason")) + .build(); + + ClusterState current = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, currentTasks)) + .build(); + + assertTrue("persistent tasks changed (task removed)", + persistentTasksChanged(new ClusterChangedEvent("test", current, previous))); + } + + public void testPersistentTasksAssigned() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT)) + .add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT)) + .build(); + + PersistentTasksCustomMetaData previousTasks = PersistentTasksCustomMetaData.builder() + .addTask("_task_1", "test", null, new Assignment("_node_1", "")) + .addTask("_task_2", "test", null, new Assignment(null, "unassigned")) + .build(); + + ClusterState previous = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, previousTasks)) + .build(); + + PersistentTasksCustomMetaData currentTasks = PersistentTasksCustomMetaData.builder() + .addTask("_task_1", "test", null, new Assignment("_node_1", "")) + .addTask("_task_2", "test", null, new Assignment("_node_2", "")) + .build(); + + ClusterState current = ClusterState.builder(new ClusterName("_name")) + .nodes(nodes) + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, currentTasks)) + .build(); + + assertTrue("persistent tasks changed (task assigned)", + persistentTasksChanged(new ClusterChangedEvent("test", current, previous))); + } + + public void testNeedsReassignment() { + DiscoveryNodes nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT)) + .add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT)) + .build(); + + assertTrue(needsReassignment(new Assignment(null, "unassigned"), nodes)); + assertTrue(needsReassignment(new Assignment("_node_left", "assigned to a node that left"), nodes)); + assertFalse(needsReassignment(new Assignment("_node_1", "assigned"), nodes)); + } private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) { for (int i = 0; i < nonLocalNodesCount; i++) { @@ -183,29 +293,25 @@ private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) } private ClusterState reassign(ClusterState clusterState) { - return PersistentTasksClusterService.reassignTasks(clusterState, logger, - new PersistentTasksClusterService.ExecutorNodeDecider() { - @Override - public Assignment getAssignment( - String action, ClusterState currentState, Params params) { - TestParams testParams = (TestParams) params; - switch (testParams.getTestParam()) { - case "assign_me": - return randomNodeAssignment(currentState.nodes()); - case "dont_assign_me": - return NO_NODE_FOUND; - case "fail_me_if_called": - fail("the decision decider shouldn't be called on this task"); - return null; - case "assign_one": - return assignOnlyOneTaskAtATime(currentState); - default: - fail("unknown param " + testParams.getTestParam()); - } - return NO_NODE_FOUND; - } - }); + PersistentTasksClusterService service = createService((params, currentState) -> { + TestParams testParams = (TestParams) params; + switch (testParams.getTestParam()) { + case "assign_me": + return randomNodeAssignment(currentState.nodes()); + case "dont_assign_me": + return NO_NODE_FOUND; + case "fail_me_if_called": + fail("the decision decider shouldn't be called on this task"); + return null; + case "assign_one": + return assignOnlyOneTaskAtATime(currentState); + default: + fail("unknown param " + testParams.getTestParam()); + } + return NO_NODE_FOUND; + }); + return service.reassignTasks(clusterState); } private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) { @@ -450,4 +556,21 @@ private void changeRoutingTable(MetaData.Builder metaData, RoutingTable.Builder metaData.put(indexMetaData, false); routingTable.addAsNew(indexMetaData); } + + /** Creates a PersistentTasksClusterService with a single PersistentTasksExecutor implemented by a BiFunction **/ + static

PersistentTasksClusterService createService(final BiFunction fn) { + PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, + singleton(new PersistentTasksExecutor

(Settings.EMPTY, TestPersistentTasksExecutor.NAME, null) { + @Override + public Assignment getAssignment(P params, ClusterState clusterState) { + return fn.apply(params, clusterState); + } + + @Override + protected void nodeOperation(AllocatedPersistentTask task, P params, Task.Status status) { + throw new UnsupportedOperationException(); + } + })); + return new PersistentTasksClusterService(Settings.EMPTY, registry, mock(ClusterService.class)); + } }