Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Introduce a "starting" datafeed state for lazy jobs #53918

Merged
merged 5 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/reference/ml/ml-shared.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,10 @@ tag::state-datafeed[]
The status of the {dfeed}, which can be one of the following values:
+
--
* `starting`: The {dfeed} has been requested to start but has not yet started.
* `started`: The {dfeed} is actively receiving data.
* `stopping`: The {dfeed} has been requested to stop gracefully and is
completing its final action.
Comment on lines +1393 to +1394
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stopping state is not introduced by this PR - it has existed for a couple of years. But I thought I'd add it while I was modifying the list.

(This PR exposes the starting state externally for the first time, so it's understandable it wasn't previously documented.)

* `stopped`: The {dfeed} is stopped and will not receive data until it is
re-started.
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,18 @@ public static JobState getJobStateModifiedForReassignments(@Nullable PersistentT

public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
PersistentTasksCustomMetaData.PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
// TODO: report (task != null && task.getState() == null) as STARTING in version 8, and fix side effects
if (task != null && task.getState() != null) {
return (DatafeedState) task.getState();
} else {
if (task == null) {
// If we haven't started a datafeed then there will be no persistent task,
// which is the same as if the datafeed was't started
return DatafeedState.STOPPED;
}
DatafeedState taskState = (DatafeedState) task.getState();
if (taskState == null) {
// If we haven't set a state yet then the task has never been assigned, so
// report that it's starting
return DatafeedState.STARTING;
}
return taskState;
}

public static DataFrameAnalyticsState getDataFrameAnalyticsState(String analyticsId, @Nullable PersistentTasksCustomMetaData tasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public void testGetDatefeedState() {
tasksBuilder.addTask(MlTasks.datafeedTaskId("foo"), MlTasks.DATAFEED_TASK_NAME,
new StartDatafeedAction.DatafeedParams("foo", 0L),
new PersistentTasksCustomMetaData.Assignment("bar", "test assignment"));
assertEquals(DatafeedState.STOPPED, MlTasks.getDatafeedState("foo", tasksBuilder.build()));
// A task with no state means the datafeed is starting
assertEquals(DatafeedState.STARTING, MlTasks.getDatafeedState("foo", tasksBuilder.build()));

tasksBuilder.updateTaskState(MlTasks.datafeedTaskId("foo"), DatafeedState.STARTED);
assertEquals(DatafeedState.STARTED, MlTasks.getDatafeedState("foo", tasksBuilder.build()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ void closeJob(String reason) {
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
* of endpoints waiting for a condition tested by this predicate would never get a response.
*/
private class JobPredicate implements Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> {
private static class JobPredicate implements Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> {

private volatile boolean opened;
private volatile Exception exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafee
// what would have happened if the error had been detected in the "fast fail" validation
cancelDatafeedStart(persistentTask, predicate.exception, listener);
} else {
listener.onResponse(new AcknowledgedResponse(true));
listener.onResponse(new AcknowledgedResponse(predicate.started));
}
}

Expand Down Expand Up @@ -477,8 +477,9 @@ public void isolate() {
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
* of endpoints waiting for a condition tested by this predicate would never get a response.
*/
private class DatafeedPredicate implements Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> {
private static class DatafeedPredicate implements Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> {

private volatile boolean started;
private volatile Exception exception;

@Override
Expand All @@ -487,15 +488,21 @@ public boolean test(PersistentTasksCustomMetaData.PersistentTask<?> persistentTa
return false;
}
PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment();
if (assignment != null && assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false &&
assignment.isAssigned() == false) {
// Assignment has failed despite passing our "fast fail" validation
exception = new ElasticsearchStatusException("Could not start datafeed, allocation explanation [" +
if (assignment != null) {
// This means we are awaiting the datafeed's job to be assigned to a node
if (assignment.equals(DatafeedNodeSelector.AWAITING_JOB_ASSIGNMENT)) {
return true;
}
if (assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false && assignment.isAssigned() == false) {
// Assignment has failed despite passing our "fast fail" validation
exception = new ElasticsearchStatusException("Could not start datafeed, allocation explanation [" +
assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS);
return true;
return true;
}
}
DatafeedState datafeedState = (DatafeedState) persistentTask.getState();
return datafeedState == DatafeedState.STARTED;
started = datafeedState == DatafeedState.STARTED;
return started;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,8 @@ private static void addDatafeedTaskIdAccordingToState(String datafeedId,
List<String> stoppingDatafeedIds,
List<String> notStoppedDatafeedIds) {
switch (datafeedState) {
// Treat STARTING like STARTED for stop API behaviour.
case STARTING:
// The STARTING state is not used anywhere at present, so this should never happen.
// At present datafeeds that have a persistent task that hasn't yet been assigned
// a state are reported as STOPPED (which is not great). It could be considered a
// breaking change to introduce the STARTING state though, so let's aim to do it in
// version 8. Also consider treating STARTING like STARTED for stop API behaviour.
notStoppedDatafeedIds.add(datafeedId);
break;
case STARTED:
startedDatafeedIds.add(datafeedId);
notStoppedDatafeedIds.add(datafeedId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class DatafeedNodeSelector {

private static final Logger LOGGER = LogManager.getLogger(DatafeedNodeSelector.class);

public static final PersistentTasksCustomMetaData.Assignment AWAITING_JOB_ASSIGNMENT =
new PersistentTasksCustomMetaData.Assignment(null, "datafeed awaiting job assignment.");

private final String datafeedId;
private final String jobId;
private final List<String> datafeedIndices;
Expand Down Expand Up @@ -76,9 +79,14 @@ public PersistentTasksCustomMetaData.Assignment selectNode() {

AssignmentFailure assignmentFailure = checkAssignment();
if (assignmentFailure == null) {
return new PersistentTasksCustomMetaData.Assignment(jobTask.getExecutorNode(), "");
String jobNode = jobTask.getExecutorNode();
if (jobNode == null) {
return AWAITING_JOB_ASSIGNMENT;
}
return new PersistentTasksCustomMetaData.Assignment(jobNode, "");
}
LOGGER.debug(assignmentFailure.reason);
assert assignmentFailure.reason.isEmpty() == false;
return new PersistentTasksCustomMetaData.Assignment(null, assignmentFailure.reason);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
Expand Down Expand Up @@ -424,6 +425,60 @@ public void testMlStateAndResultsIndicesNotAvailable() throws Exception {
assertBusy(() -> assertJobTask(jobId, JobState.OPENED, true));
}

public void testCloseUnassignedLazyJobAndDatafeed() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);
ensureStableCluster(3);

String jobId = "test-lazy-stop";
String datafeedId = jobId + "-datafeed";
// Assume the test machine won't have space to assign a 2TB job
Job.Builder job = createJob(jobId, new ByteSizeValue(2, ByteSizeUnit.TB), true);
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();

client().admin().indices().prepareCreate("data").setMapping("time", "type=date").get();

DatafeedConfig config = createDatafeed(datafeedId, jobId, Collections.singletonList("data"));
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet();

ensureYellow(); // at least the primary shards of the indices a job uses should be started
OpenJobAction.Request openJobRequest = new OpenJobAction.Request(jobId);
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();

// Job state should be opening because it won't fit anyway, but is allowed to open lazily
GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(jobId);
GetJobsStatsAction.Response jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet();
assertEquals(JobState.OPENING, jobStatsResponse.getResponse().results().get(0).getState());

StartDatafeedAction.Request startDataFeedRequest = new StartDatafeedAction.Request(config.getId(), 0L);
client().execute(StartDatafeedAction.INSTANCE, startDataFeedRequest).actionGet();

// Datafeed state should be starting while it waits for job assignment
GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId);
GetDatafeedsStatsAction.Response datafeedStatsResponse =
client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet();
assertEquals(DatafeedState.STARTING, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState());

// A starting datafeed can be stopped normally or by force
StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId);
stopDatafeedRequest.setForce(randomBoolean());
StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet();
assertTrue(stopDatafeedResponse.isStopped());

datafeedStatsResponse = client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet();
assertEquals(DatafeedState.STOPPED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState());

// An opening job can also be stopped normally or by force
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId);
closeJobRequest.setForce(randomBoolean());
CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet();
assertTrue(closeJobResponse.isClosed());

jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet();
assertEquals(JobState.CLOSED, jobStatsResponse.getResponse().results().get(0).getState());
}

private void assertJobTask(String jobId, JobState expectedState, boolean hasExecutorNode) {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ protected Job.Builder createJob(String id) {
}

protected Job.Builder createJob(String id, ByteSizeValue modelMemoryLimit) {
return createJob(id, modelMemoryLimit, false);
}

protected Job.Builder createJob(String id, ByteSizeValue modelMemoryLimit, boolean allowLazyOpen) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
Expand All @@ -141,6 +145,7 @@ protected Job.Builder createJob(String id, ByteSizeValue modelMemoryLimit) {
}
builder.setAnalysisConfig(analysisConfig);
builder.setDataDescription(dataDescription);
builder.setAllowLazyOpen(allowLazyOpen);
return builder;
}

Expand Down