Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Ensure data frame analytics jobs don't run on a node that's too new #62749

Merged
merged 3 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public String toString() {
public static class TaskParams implements PersistentTaskParams {

public static final Version VERSION_INTRODUCED = Version.V_7_3_0;
public static final Version VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED = Version.V_7_10_0;

private static final ParseField PROGRESS_ON_START = new ParseField("progress_on_start");

Expand Down Expand Up @@ -184,14 +185,18 @@ private TaskParams(String id, String version, @Nullable List<PhaseProgress> prog
public TaskParams(StreamInput in) throws IOException {
this.id = in.readString();
this.version = Version.readVersion(in);
progressOnStart = in.readList(PhaseProgress::new);
allowLazyStart = in.readBoolean();
this.progressOnStart = in.readList(PhaseProgress::new);
this.allowLazyStart = in.readBoolean();
}

public String getId() {
return id;
}

public Version getVersion() {
return version;
}

public List<PhaseProgress> getProgressOnStart() {
return progressOnStart;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
Expand All @@ -16,6 +15,8 @@
import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.test.VersionUtils.randomVersion;

public class StartDataFrameAnalyticsActionTaskParamsTests extends AbstractSerializingTestCase<StartDataFrameAnalyticsAction.TaskParams> {

@Override
Expand All @@ -30,7 +31,11 @@ protected StartDataFrameAnalyticsAction.TaskParams createTestInstance() {
for (int i = 0; i < phaseCount; i++) {
progressOnStart.add(new PhaseProgress(randomAlphaOfLength(10), randomIntBetween(0, 100)));
}
return new StartDataFrameAnalyticsAction.TaskParams(randomAlphaOfLength(10), Version.CURRENT, progressOnStart, randomBoolean());
return new StartDataFrameAnalyticsAction.TaskParams(
randomAlphaOfLength(10),
randomVersion(random()),
progressOnStart,
randomBoolean());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction.TaskParams;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
Expand Down Expand Up @@ -168,10 +169,10 @@ protected void masterOperation(Task task, StartDataFrameAnalyticsAction.Request
}

// Wait for analytics to be started
ActionListener<PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams>> waitForAnalyticsToStart =
new ActionListener<PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams>>() {
ActionListener<PersistentTasksCustomMetadata.PersistentTask<TaskParams>> waitForAnalyticsToStart =
new ActionListener<PersistentTasksCustomMetadata.PersistentTask<TaskParams>>() {
@Override
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> task) {
public void onResponse(PersistentTasksCustomMetadata.PersistentTask<TaskParams> task) {
waitForAnalyticsStarted(task, request.getTimeout(), listener);
}

Expand All @@ -188,11 +189,17 @@ public void onFailure(Exception e) {
// Start persistent task
ActionListener<StartContext> memoryUsageHandledListener = ActionListener.wrap(
startContext -> {
StartDataFrameAnalyticsAction.TaskParams taskParams = new StartDataFrameAnalyticsAction.TaskParams(
request.getId(), startContext.config.getVersion(), startContext.progressOnStart,
startContext.config.isAllowLazyStart());
persistentTasksService.sendStartRequest(MlTasks.dataFrameAnalyticsTaskId(request.getId()),
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, taskParams, waitForAnalyticsToStart);
TaskParams taskParams =
new TaskParams(
request.getId(),
startContext.config.getVersion(),
startContext.progressOnStart,
startContext.config.isAllowLazyStart());
persistentTasksService.sendStartRequest(
MlTasks.dataFrameAnalyticsTaskId(request.getId()),
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
taskParams,
waitForAnalyticsToStart);
},
listener::onFailure
);
Expand Down Expand Up @@ -430,7 +437,7 @@ private void checkDestIndexIsEmptyIfExists(ParentTaskAssigningClient parentTaskC
);
}

private void waitForAnalyticsStarted(PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> task,
private void waitForAnalyticsStarted(PersistentTasksCustomMetadata.PersistentTask<TaskParams> task,
TimeValue timeout, ActionListener<NodeAcknowledgedResponse> listener) {
AnalyticsPredicate predicate = new AnalyticsPredicate();
persistentTasksService.waitForPersistentTaskCondition(task.getId(), predicate, timeout,
Expand Down Expand Up @@ -557,7 +564,7 @@ public boolean test(PersistentTasksCustomMetadata.PersistentTask<?> persistentTa
}

private void cancelAnalyticsStart(
PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> persistentTask, Exception exception,
PersistentTasksCustomMetadata.PersistentTask<TaskParams> persistentTask, Exception exception,
ActionListener<NodeAcknowledgedResponse> listener) {
persistentTasksService.sendRemoveRequest(persistentTask.getId(),
new ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>() {
Expand Down Expand Up @@ -596,7 +603,7 @@ static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterStat
return unavailableIndices;
}

public static class TaskExecutor extends PersistentTasksExecutor<StartDataFrameAnalyticsAction.TaskParams> {
public static class TaskExecutor extends PersistentTasksExecutor<TaskParams> {

private final Client client;
private final ClusterService clusterService;
Expand Down Expand Up @@ -635,15 +642,14 @@ public TaskExecutor(Settings settings, Client client, ClusterService clusterServ
@Override
protected AllocatedPersistentTask createTask(
long id, String type, String action, TaskId parentTaskId,
PersistentTasksCustomMetadata.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> persistentTask,
PersistentTasksCustomMetadata.PersistentTask<TaskParams> persistentTask,
Map<String, String> headers) {
return new DataFrameAnalyticsTask(
id, type, action, parentTaskId, headers, client, clusterService, manager, auditor, persistentTask.getParams());
}

@Override
public PersistentTasksCustomMetadata.Assignment getAssignment(StartDataFrameAnalyticsAction.TaskParams params,
ClusterState clusterState) {
public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params, ClusterState clusterState) {

// If we are waiting for an upgrade to complete, we should not assign to a node
if (MlMetadata.getMlMetadata(clusterState).isUpgradeMode()) {
Expand Down Expand Up @@ -680,17 +686,21 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(StartDataFrameAnal
}
}

JobNodeSelector jobNodeSelector = new JobNodeSelector(clusterState, id, MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker,
params.isAllowLazyStart() ? Integer.MAX_VALUE : maxLazyMLNodes, node -> nodeFilter(node, id));
JobNodeSelector jobNodeSelector =
new JobNodeSelector(
clusterState,
id,
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
memoryTracker,
params.isAllowLazyStart() ? Integer.MAX_VALUE : maxLazyMLNodes,
node -> nodeFilter(node, params));
// Pass an effectively infinite value for max concurrent opening jobs, because data frame analytics jobs do
// not have an "opening" state so would never be rejected for causing too many jobs in the "opening" state
return jobNodeSelector.selectNode(
maxOpenJobs, Integer.MAX_VALUE, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
return jobNodeSelector.selectNode(maxOpenJobs, Integer.MAX_VALUE, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
}

@Override
protected void nodeOperation(AllocatedPersistentTask task, StartDataFrameAnalyticsAction.TaskParams params,
PersistentTaskState state) {
protected void nodeOperation(AllocatedPersistentTask task, TaskParams params, PersistentTaskState state) {
DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState) state;
DataFrameAnalyticsState analyticsState = analyticsTaskState == null ? null : analyticsTaskState.getState();
logger.info("[{}] Starting data frame analytics from state [{}]", params.getId(), analyticsState);
Expand Down Expand Up @@ -731,12 +741,19 @@ private void executeTask(DataFrameAnalyticsTaskState analyticsTaskState, Allocat
}
}

public static String nodeFilter(DiscoveryNode node, String id) {
public static String nodeFilter(DiscoveryNode node, TaskParams params) {
String id = params.getId();

if (node.getVersion().before(StartDataFrameAnalyticsAction.TaskParams.VERSION_INTRODUCED)) {
if (node.getVersion().before(TaskParams.VERSION_INTRODUCED)) {
return "Not opening job [" + id + "] on node [" + JobNodeSelector.nodeNameAndVersion(node)
+ "], because the data frame analytics requires a node of version ["
+ StartDataFrameAnalyticsAction.TaskParams.VERSION_INTRODUCED + "] or higher";
+ TaskParams.VERSION_INTRODUCED + "] or higher";
}
if (node.getVersion().before(TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok. During a rolling upgrade, all nodes will eventually get to the latest version. And since we don't support upgrade rollbacks, this seems like a good check

&& params.getVersion().onOrAfter(TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED)) {
return "Not opening job [" + id + "] on node [" + JobNodeSelector.nodeNameAndVersion(node)
+ "], because the data frame analytics created for version [" + params.getVersion() + "] requires a node of version "
+ "[" + TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED + "] or higher";
}

return null;
Expand All @@ -754,6 +771,4 @@ void setMaxOpenJobs(int maxOpenJobs) {
this.maxOpenJobs = maxOpenJobs;
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
Expand Down Expand Up @@ -80,7 +81,8 @@ public final class DestinationIndex {
* If the results mappings change in a way existing destination indices will fail to index
* the results, this should be bumped accordingly.
*/
public static final Version MIN_COMPATIBLE_VERSION = Version.V_7_10_0;
public static final Version MIN_COMPATIBLE_VERSION =
StartDataFrameAnalyticsAction.TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED;

private DestinationIndex() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
long maxAvailableMemory = Long.MIN_VALUE;
DiscoveryNode minLoadedNodeByCount = null;
DiscoveryNode minLoadedNodeByMemory = null;
PersistentTasksCustomMetadata persistentTasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
for (DiscoveryNode node : clusterState.getNodes()) {

// First check conditions that would rule out the node regardless of what other tasks are assigned to it
Expand Down
Loading