Skip to content

Commit

Permalink
Small code cleanups and refactorings in persistent tasks
Browse files Browse the repository at this point in the history
This commit consists of small code cleanups and refactorings in the
persistent tasks framework. Most changes are in
PersistentTasksClusterService where some methods have been renamed
or merged together, documentation has been added, unused code removed
in order to improve readability of the code.
  • Loading branch information
tlrx committed Mar 16, 2018
1 parent 708c068 commit 3903f42
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.persistent;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
Expand All @@ -33,9 +32,9 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.tasks.Task;

import java.util.Objects;

Expand All @@ -52,29 +51,31 @@ public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorR
this.clusterService = clusterService;
clusterService.addListener(this);
this.registry = registry;

}

/**
* Creates a new persistent task on master node
*
* @param action the action name
* @param params params
* @param listener the listener that will be called when task is started
* @param taskId the task's id
* @param taskName the task's name
* @param taskParams the task's parameters
* @param listener the listener that will be called when task is started
*/
public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String action, @Nullable Params params,
public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String taskName, @Nullable Params taskParams,
ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
public ClusterState execute(ClusterState currentState) {
PersistentTasksCustomMetaData.Builder builder = builder(currentState);
if (builder.hasTask(taskId)) {
throw new ResourceAlreadyExistsException("task with id {" + taskId + "} already exist");
}
validate(action, currentState, params);
final Assignment assignment;
assignment = getAssignement(action, currentState, params);
return update(currentState, builder.addTask(taskId, action, params, assignment));

PersistentTasksExecutor<Params> taskExecutor = registry.getPersistentTaskExecutorSafe(taskName);
taskExecutor.validate(taskParams, currentState);

Assignment assignment = createAssignment(taskName, taskParams, currentState);
return update(currentState, builder.addTask(taskId, taskName, taskParams, assignment));
}

@Override
Expand All @@ -95,7 +96,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}


/**
* Restarts a record about a running persistent task from cluster state
*
Expand All @@ -114,7 +114,7 @@ public void completePersistentTask(String id, long allocationId, Exception failu
}
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
public ClusterState execute(ClusterState currentState) {
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id, allocationId)) {
tasksInProgress.removeTask(id);
Expand Down Expand Up @@ -185,7 +185,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
public void updatePersistentTaskStatus(String id, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
public ClusterState execute(ClusterState currentState) {
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id, allocationId)) {
return update(currentState, tasksInProgress.updateTaskStatus(id, status));
Expand All @@ -211,93 +211,85 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

private <Params extends PersistentTaskParams> Assignment getAssignement(String taskName, ClusterState currentState,
@Nullable Params params) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
return persistentTasksExecutor.getAssignment(params, currentState);
}
/**
* Creates a new {@link Assignment} for the given persistent task.
*
* @param taskName the task's name
* @param taskParams the task's parameters
* @param currentState the current {@link ClusterState}
private <Params extends PersistentTaskParams> void validate(String taskName, ClusterState currentState, @Nullable Params params) {
* @return a new {@link Assignment}
*/
private <Params extends PersistentTaskParams> Assignment createAssignment(final String taskName,
final @Nullable Params taskParams,
final ClusterState currentState) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
persistentTasksExecutor.validate(params, currentState);
return persistentTasksExecutor.getAssignment(taskParams, currentState);
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.localNodeMaster()) {
logger.trace("checking task reassignment for cluster state {}", event.state().getVersion());
if (reassignmentRequired(event, this::getAssignement)) {
logger.trace("task reassignment is needed");
reassignTasks();
} else {
logger.trace("task reassignment is not needed");
if (shouldReassignPersistentTasks(event)) {
logger.trace("checking task reassignment for cluster state {}", event.state().getVersion());
clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return reassignTasks(currentState);
}

@Override
public void onFailure(String source, Exception e) {
logger.warn("failed to reassign persistent tasks", e);
}
});
}
}
}

interface ExecutorNodeDecider {
<Params extends PersistentTaskParams> Assignment getAssignment(String action, ClusterState currentState, Params params);
}
/**
* Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following
* situations: a node left or is added, the routing table changed, the master node changed or the persistent tasks changed.
*/
boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
final PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasks == null) {
return false;
}

static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) {
PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData prevTasks = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasks != null && (Objects.equals(tasks, prevTasks) == false ||
event.nodesChanged() ||
event.routingTableChanged() ||
event.previousState().nodes().isLocalNodeElectedMaster() == false)) {
// We need to check if removed nodes were running any of the tasks and reassign them
boolean reassignmentRequired = false;
for (PersistentTask<?> taskInProgress : tasks.tasks()) {
if (taskInProgress.needsReassignment(event.state().nodes())) {
// there is an unassigned task or task with a disappeared node - we need to try assigning it
if (Objects.equals(taskInProgress.getAssignment(),
decider.getAssignment(taskInProgress.getTaskName(), event.state(), taskInProgress.getParams())) == false) {
// it looks like a assignment for at least one task is possible - let's trigger reassignment
reassignmentRequired = true;
break;
}
boolean masterChanged = event.previousState().nodes().isLocalNodeElectedMaster() == false;

if (persistentTasksChanged(event) || event.nodesChanged() || event.routingTableChanged() || masterChanged) {
for (PersistentTask<?> task : tasks.tasks()) {
if (needsReassignment(task.getAssignment(), event.state().nodes())) {
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state());
if (Objects.equals(assignment, task.getAssignment()) == false) {
return true;
}
}
}
return reassignmentRequired;
}
return false;
}

/**
* Evaluates the cluster state and tries to assign tasks to nodes
* Evaluates the cluster state and tries to assign tasks to nodes.
*
* @param currentState the cluster state to analyze
* @return an updated version of the cluster state
*/
public void reassignTasks() {
clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return reassignTasks(currentState, logger, PersistentTasksClusterService.this::getAssignement);
}

@Override
public void onFailure(String source, Exception e) {
logger.warn("Unsuccessful persistent task reassignment", e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {

}
});
}

static ClusterState reassignTasks(ClusterState currentState, Logger logger, ExecutorNodeDecider decider) {
PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
ClusterState reassignTasks(final ClusterState currentState) {
ClusterState clusterState = currentState;
DiscoveryNodes nodes = currentState.nodes();

final PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasks != null) {
logger.trace("reassigning {} persistent tasks", tasks.tasks().size());
final DiscoveryNodes nodes = currentState.nodes();

// We need to check if removed nodes were running any of the tasks and reassign them
for (PersistentTask<?> task : tasks.tasks()) {
if (task.needsReassignment(nodes)) {
// there is an unassigned task - we need to try assigning it
Assignment assignment = decider.getAssignment(task.getTaskName(), clusterState, task.getParams());
if (needsReassignment(task.getAssignment(), nodes)) {
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState);
if (Objects.equals(assignment, task.getAssignment()) == false) {
logger.trace("reassigning task {} from node {} to node {}", task.getId(),
task.getAssignment().getExecutorNode(), assignment.getExecutorNode());
Expand All @@ -313,6 +305,17 @@ static ClusterState reassignTasks(ClusterState currentState, Logger logger, Exec
return clusterState;
}

/** Returns true if the persistent tasks are not equal between the previous and the current cluster state **/
static boolean persistentTasksChanged(final ClusterChangedEvent event) {
String type = PersistentTasksCustomMetaData.TYPE;
return Objects.equals(event.state().metaData().custom(type), event.previousState().metaData().custom(type)) == false;
}

/** Returns true if the task is not assigned or is assigned to a non-existing node */
static boolean needsReassignment(final Assignment assignment, final DiscoveryNodes nodes) {
return (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false);
}

private static PersistentTasksCustomMetaData.Builder builder(ClusterState currentState) {
return PersistentTasksCustomMetaData.builder(currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ private TaskDescriptionBuilder setStatus(Status status) {
}
}


public Collection<PersistentTask<?>> tasks() {
return this.tasks.values();
}
Expand All @@ -165,12 +164,6 @@ public Collection<PersistentTask<?>> findTasks(String taskName, Predicate<Persis
.collect(Collectors.toList());
}

public boolean tasksExist(String taskName, Predicate<PersistentTask<?>> predicate) {
return this.tasks().stream()
.filter(p -> taskName.equals(p.getTaskName()))
.anyMatch(predicate);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -279,7 +272,6 @@ public static class PersistentTask<P extends PersistentTaskParams> implements Wr
@Nullable
private final Long allocationIdOnLastStatusUpdate;


public PersistentTask(String id, String taskName, P params, long allocationId, Assignment assignment) {
this(id, allocationId, taskName, params, null, assignment, null);
}
Expand Down Expand Up @@ -395,13 +387,6 @@ public boolean isAssigned() {
return assignment.isAssigned();
}

/**
* Returns true if the tasks is not stopped and unassigned or assigned to a non-existing node.
*/
public boolean needsReassignment(DiscoveryNodes nodes) {
return (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false);
}

@Nullable
public Status getStatus() {
return status;
Expand Down Expand Up @@ -522,16 +507,14 @@ public static NamedDiff<MetaData.Custom> readDiffFrom(StreamInput in) throws IOE
return readDiffFrom(MetaData.Custom.class, TYPE, in);
}

public long getLastAllocationId() {
return lastAllocationId;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("last_allocation_id", lastAllocationId);
builder.startArray("tasks");
for (PersistentTask<?> entry : tasks.values()) {
entry.toXContent(builder, params);
{
for (PersistentTask<?> entry : tasks.values()) {
entry.toXContent(builder, params);
}
}
builder.endArray();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,7 @@ protected DiscoveryNode selectLeastLoadedNode(ClusterState clusterState, Predica
* <p>
* Throws an exception if the supplied params cannot be executed on the cluster in the current state.
*/
public void validate(Params params, ClusterState clusterState) {

}
public void validate(Params params, ClusterState clusterState) {}

/**
* Creates a AllocatedPersistentTask for communicating with task manager
Expand Down
Loading

0 comments on commit 3903f42

Please sign in to comment.