Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Delete job document #34595

Merged
merged 3 commits into from
Oct 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
*/
package org.elasticsearch.xpack.core.ml.job.persistence;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.xpack.core.ml.MlMetadata;

/**
* Methods for handling index naming related functions
*/
Expand Down Expand Up @@ -40,15 +37,6 @@ public static String resultsWriteAlias(String jobId) {
return AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + ".write-" + jobId;
}

/**
* Retrieves the currently defined physical index from the job state
* @param jobId Job Id
* @return The index name
*/
public static String getPhysicalIndexFromState(ClusterState state, String jobId) {
return MlMetadata.getMlMetadata(state).getJobs().get(jobId).getResultsIndexName();
}

/**
* The name of the default index where a job's state is stored
* @return The index name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.Nullable;
Expand All @@ -52,30 +49,36 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
import org.elasticsearch.xpack.core.ml.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
Expand All @@ -89,6 +92,8 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
private final PersistentTasksService persistentTasksService;
private final Auditor auditor;
private final JobResultsProvider jobResultsProvider;
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;

/**
* A map of task listeners by job_id.
Expand All @@ -102,13 +107,16 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
public TransportDeleteJobAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService,
Client client, Auditor auditor, JobResultsProvider jobResultsProvider) {
Client client, Auditor auditor, JobResultsProvider jobResultsProvider,
JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider) {
super(settings, DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, DeleteJobAction.Request::new);
this.client = client;
this.persistentTasksService = persistentTasksService;
this.auditor = auditor;
this.jobResultsProvider = jobResultsProvider;
this.jobConfigProvider = jobConfigProvider;
this.datafeedConfigProvider = datafeedConfigProvider;
this.listenersByJobId = new HashMap<>();
}

Expand Down Expand Up @@ -137,6 +145,10 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
ActionListener<AcknowledgedResponse> listener) {
logger.debug("Deleting job '{}'", request.getJobId());

if (request.isForce() == false) {
checkJobIsNotOpen(request.getJobId(), state);
}

TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId);

Expand Down Expand Up @@ -175,7 +187,7 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
finalListener.onFailure(e);
});

markJobAsDeleting(request.getJobId(), markAsDeletingListener, request.isForce());
markJobAsDeletingIfNotUsed(request.getJobId(), markAsDeletingListener);
}

private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @Nullable Exception error) {
Expand Down Expand Up @@ -211,33 +223,15 @@ private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJ
}
};

// Step 3. When the physical storage has been deleted, remove from Cluster State
// Step 3. When the physical storage has been deleted, delete the job config document
// -------
CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> clusterService.submitStateUpdateTask(
"delete-job-" + jobId,
new AckedClusterStateUpdateTask<Boolean>(request, ActionListener.wrap(apiResponseHandler, listener::onFailure)) {

@Override
protected Boolean newResponse(boolean acknowledged) {
return acknowledged && response;
}

@Override
public ClusterState execute(ClusterState currentState) {
MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState);
if (currentMlMetadata.getJobs().containsKey(jobId) == false) {
// We wouldn't have got here if the job never existed so
// the Job must have been deleted by another action.
// Don't error in this case
return currentState;
}

MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata);
builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
return buildNewClusterState(currentState, builder);
}
});

// Don't report an error if the document has already been deleted
CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> jobConfigProvider.deleteJob(jobId, false,
ActionListener.wrap(
deleteResponse -> apiResponseHandler.accept(Boolean.TRUE),
listener::onFailure
)
);

// Step 2. Remove the job from any calendars
CheckedConsumer<Boolean, Exception> removeFromCalendarsHandler = response -> jobResultsProvider.removeJobFromCalendars(jobId,
Expand All @@ -251,26 +245,26 @@ public ClusterState execute(ClusterState currentState) {
private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, String jobId,
CheckedConsumer<Boolean, Exception> finishedHandler, Consumer<Exception> failureHandler) {

final String indexName = AnomalyDetectorsIndex.getPhysicalIndexFromState(clusterService.state(), jobId);
final String indexPattern = indexName + "-*";
AtomicReference<String> indexName = new AtomicReference<>();

final ActionListener<AcknowledgedResponse> completionHandler = ActionListener.wrap(
response -> finishedHandler.accept(response.isAcknowledged()),
failureHandler);

// Step 7. If we did not drop the index and after DBQ state done, we delete the aliases
// Step 8. If we did not drop the index and after DBQ state done, we delete the aliases
ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(
bulkByScrollResponse -> {
if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume Index was deleted
completionHandler.onResponse(new AcknowledgedResponse(true));
} else {
if (bulkByScrollResponse.isTimedOut()) {
logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName, indexPattern);
logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName.get(),
indexName.get() + "-*");
}
if (!bulkByScrollResponse.getBulkFailures().isEmpty()) {
logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery on indices [{}, {}].",
jobId, bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts(),
indexName, indexPattern);
indexName.get(), indexName.get() + "-*");
for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) {
logger.warn("DBQ failure: " + failure);
}
Expand All @@ -280,12 +274,13 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
},
failureHandler);

// Step 6. If we did not delete the index, we run a delete by query
// Step 7. If we did not delete the index, we run a delete by query
ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap(
response -> {
if (response) {
logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]");
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName, indexPattern);
String indexPattern = indexName.get() + "-*";
logger.info("Running DBQ on [" + indexName.get() + "," + indexPattern + "] for job [" + jobId + "]");
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName.get(), indexPattern);
ConstantScoreQueryBuilder query =
new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
request.setQuery(query);
Expand All @@ -301,15 +296,15 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
},
failureHandler);

// Step 5. If we have any hits, that means we are NOT the only job on this index, and should not delete it
// Step 6. If we have any hits, that means we are NOT the only job on this index, and should not delete it
// if we do not have any hits, we can drop the index and then skip the DBQ and alias deletion
ActionListener<SearchResponse> customIndexSearchHandler = ActionListener.wrap(
searchResponse -> {
if (searchResponse == null || searchResponse.getHits().totalHits > 0) {
deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion
} else {
logger.info("Running DELETE Index on [" + indexName + "] for job [" + jobId + "]");
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
logger.info("Running DELETE Index on [" + indexName.get() + "] for job [" + jobId + "]");
DeleteIndexRequest request = new DeleteIndexRequest(indexName.get());
request.indicesOptions(IndicesOptions.lenientExpandOpen());
// If we have deleted the index, then we don't need to delete the aliases or run the DBQ
executeAsyncWithOrigin(
Expand All @@ -331,9 +326,11 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
}
);

// Step 4. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
response -> {
// Step 5. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases
ActionListener<Job.Builder> getJobHandler = ActionListener.wrap(
builder -> {
Job job = builder.build();
indexName.set(job.getResultsIndexName());
if (indexName.equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX +
AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) {
//don't bother searching the index any further, we are on the default shared
Expand All @@ -344,14 +341,22 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
.query(QueryBuilders.boolQuery().filter(
QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))));

SearchRequest searchRequest = new SearchRequest(indexName);
SearchRequest searchRequest = new SearchRequest(indexName.get());
searchRequest.source(source);
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, customIndexSearchHandler);
}
},
failureHandler
);

// Step 4. Get the job as the result index name is required
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
response -> {
jobConfigProvider.getJob(jobId, getJobHandler);
},
failureHandler
);

// Step 3. Delete quantiles done, delete the categorizer state
ActionListener<Boolean> deleteQuantilesHandler = ActionListener.wrap(
response -> deleteCategorizerState(parentTaskClient, jobId, 1, deleteCategorizerStateHandler),
Expand Down Expand Up @@ -554,36 +559,28 @@ public void onFailure(Exception e) {
}
}

private void markJobAsDeleting(String jobId, ActionListener<Boolean> listener, boolean force) {
clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState));
builder.markJobAsDeleting(jobId, tasks, force);
return buildNewClusterState(currentState, builder);
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
logger.debug("Job [" + jobId + "] is successfully marked as deleted");
listener.onResponse(true);
}
});
private void checkJobIsNotOpen(String jobId, ClusterState state) {
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
if (jobTask != null) {
JobTaskState jobTaskState = (JobTaskState) jobTask.getState();
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job is "
+ ((jobTaskState == null) ? JobState.OPENING : jobTaskState.getState()));
}
}

static boolean jobIsDeletedFromState(String jobId, ClusterState clusterState) {
return !MlMetadata.getMlMetadata(clusterState).getJobs().containsKey(jobId);
}
private void markJobAsDeletingIfNotUsed(String jobId, ActionListener<Boolean> listener) {

private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) {
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build());
return newState.build();
datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap(
datafeedIds -> {
if (datafeedIds.isEmpty() == false) {
listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed ["
+ datafeedIds.iterator().next() + "] refers to it"));
return;
}
jobConfigProvider.markJobAsDeleting(jobId, listener);
},
listener::onFailure
));
}
}
Loading