From 1938ac01130631326bbc9f1d2ff050089a679990 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 30 Jan 2019 09:48:46 -0600 Subject: [PATCH] ML: better handle task state race condition --- .../action/TransportSetUpgradeModeAction.java | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java index 6ec77a34c55b6..edc31f1e896b7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java @@ -5,8 +5,10 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -33,11 +35,13 @@ import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor; +import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import static org.elasticsearch.ExceptionsHelper.rethrowAndSuppress; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; @@ -119,9 +123,20 @@ protected void masterOperation(SetUpgradeModeAction.Request request, ClusterStat .cluster() .prepareListTasks() .setActions(DATAFEED_TASK_NAME + "[c]", JOB_TASK_NAME + "[c]") + // There is a chance that we failed un-allocating a task due to allocation_id being changed + // This call will timeout in that case and return an error .setWaitForCompletion(true) .setTimeout(request.timeout()).execute(ActionListener.wrap( - r -> wrappedListener.onResponse(new AcknowledgedResponse(true)), + r -> { + try { + // Handle potential node timeouts, + // these should be considered failures as tasks as still potentially executing + rethrowAndSuppress(r.getNodeFailures()); + wrappedListener.onResponse(new AcknowledgedResponse(true)); + } catch (ElasticsearchException ex) { + wrappedListener.onFailure(ex); + } + }, wrappedListener::onFailure)); }, wrappedListener::onFailure @@ -243,10 +258,19 @@ private void unassignPersistentTasks(PersistentTasksCustomMetaData tasksCustomMe .stream() .filter(persistentTask -> (persistentTask.getTaskName().equals(MlTasks.JOB_TASK_NAME) || persistentTask.getTaskName().equals(MlTasks.DATAFEED_TASK_NAME))) + // We want to always have the same ordering of which tasks we un-allocate first. + // However, the order in which the distributed tasks handle the un-allocation event is not guaranteed. + .sorted(Comparator.comparing(PersistentTask::getTaskName)) .collect(Collectors.toList()); TypedChainTaskExecutor> chainTaskExecutor = - new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true); + new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), + r -> true, + // Another process could modify tasks and thus we cannot find them via the allocation_id and name + // If the task was removed from the node, all is well + // We handle the case of allocation_id changing later in this transport class by timing out waiting for task completion + // Consequently, if the exception is ResourceNotFoundException, continue execution; circuit break otherwise. + ex -> ex instanceof ResourceNotFoundException == false); for (PersistentTask task : datafeedAndJobTasks) { chainTaskExecutor.add(