From cb3970a666a01bc19cb34cec801aadfce5fbccef Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 18 Oct 2018 14:09:05 +0100 Subject: [PATCH 1/3] Delete job config from index --- .../persistence/AnomalyDetectorsIndex.java | 12 -- .../ml/action/TransportDeleteJobAction.java | 146 +++++++++--------- .../ml/job/persistence/JobConfigProvider.java | 53 +++++-- .../action/TransportDeleteJobActionTests.java | 37 ----- .../action/TransportOpenJobActionTests.java | 2 +- .../ml/integration/JobConfigProviderIT.java | 26 +++- 6 files changed, 132 insertions(+), 144 deletions(-) delete mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobActionTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index 6cf4aee2a9672..b7b104e35cdec 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -5,9 +5,6 @@ */ package org.elasticsearch.xpack.core.ml.job.persistence; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.xpack.core.ml.MlMetadata; - /** * Methods for handling index naming related functions */ @@ -40,15 +37,6 @@ public static String resultsWriteAlias(String jobId) { return AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + ".write-" + jobId; } - /** - * Retrieves the currently defined physical index from the job state - * @param jobId Job Id - * @return The index name - */ - public static String getPhysicalIndexFromState(ClusterState state, String jobId) { - return MlMetadata.getMlMetadata(state).getJobs().get(jobId).getResultsIndexName(); - } - /** * The name of the default index where a job's state is stored * @return The index name diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 89f42d622411f..9e9c9fbcea5b1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -24,14 +24,11 @@ import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.client.ParentTaskAssigningClient; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.Nullable; @@ -52,30 +49,36 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.core.ml.action.KillProcessAction; import org.elasticsearch.xpack.core.ml.action.util.PageParams; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -89,6 +92,8 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction(); } @@ -137,6 +145,10 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust ActionListener listener) { logger.debug("Deleting job '{}'", request.getJobId()); + if (request.isForce() == false) { + checkJobIsNotOpen(request.getJobId(), state); + } + TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId); @@ -175,7 +187,7 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust finalListener.onFailure(e); }); - markJobAsDeleting(request.getJobId(), markAsDeletingListener, request.isForce()); + markJobAsDeletingIfNotUsed(request.getJobId(), markAsDeletingListener); } private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @Nullable Exception error) { @@ -211,33 +223,14 @@ private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJ } }; - // Step 3. When the physical storage has been deleted, remove from Cluster State + // Step 3. When the physical storage has been deleted, delete the job config document // ------- - CheckedConsumer deleteJobStateHandler = response -> clusterService.submitStateUpdateTask( - "delete-job-" + jobId, - new AckedClusterStateUpdateTask(request, ActionListener.wrap(apiResponseHandler, listener::onFailure)) { - - @Override - protected Boolean newResponse(boolean acknowledged) { - return acknowledged && response; - } - - @Override - public ClusterState execute(ClusterState currentState) { - MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState); - if (currentMlMetadata.getJobs().containsKey(jobId) == false) { - // We wouldn't have got here if the job never existed so - // the Job must have been deleted by another action. - // Don't error in this case - return currentState; - } - - MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); - builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); - return buildNewClusterState(currentState, builder); - } - }); - + CheckedConsumer deleteJobStateHandler = response -> jobConfigProvider.deleteJob(jobId, + ActionListener.wrap( + deleteResponse -> apiResponseHandler.accept(Boolean.TRUE), + listener::onFailure + ) + ); // Step 2. Remove the job from any calendars CheckedConsumer removeFromCalendarsHandler = response -> jobResultsProvider.removeJobFromCalendars(jobId, @@ -251,26 +244,26 @@ public ClusterState execute(ClusterState currentState) { private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, String jobId, CheckedConsumer finishedHandler, Consumer failureHandler) { - final String indexName = AnomalyDetectorsIndex.getPhysicalIndexFromState(clusterService.state(), jobId); - final String indexPattern = indexName + "-*"; + AtomicReference indexName = new AtomicReference<>(); final ActionListener completionHandler = ActionListener.wrap( response -> finishedHandler.accept(response.isAcknowledged()), failureHandler); - // Step 7. If we did not drop the index and after DBQ state done, we delete the aliases + // Step 8. If we did not drop the index and after DBQ state done, we delete the aliases ActionListener dbqHandler = ActionListener.wrap( bulkByScrollResponse -> { if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume Index was deleted completionHandler.onResponse(new AcknowledgedResponse(true)); } else { if (bulkByScrollResponse.isTimedOut()) { - logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName, indexPattern); + logger.warn("[{}] DeleteByQuery for indices [{}, {}] timed out.", jobId, indexName.get(), + indexName.get() + "-*"); } if (!bulkByScrollResponse.getBulkFailures().isEmpty()) { logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery on indices [{}, {}].", jobId, bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts(), - indexName, indexPattern); + indexName.get(), indexName.get() + "-*"); for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) { logger.warn("DBQ failure: " + failure); } @@ -280,12 +273,13 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri }, failureHandler); - // Step 6. If we did not delete the index, we run a delete by query + // Step 7. If we did not delete the index, we run a delete by query ActionListener deleteByQueryExecutor = ActionListener.wrap( response -> { if (response) { - logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]"); - DeleteByQueryRequest request = new DeleteByQueryRequest(indexName, indexPattern); + String indexPattern = indexName.get() + "-*"; + logger.info("Running DBQ on [" + indexName.get() + "," + indexPattern + "] for job [" + jobId + "]"); + DeleteByQueryRequest request = new DeleteByQueryRequest(indexName.get(), indexPattern); ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); request.setQuery(query); @@ -301,15 +295,15 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri }, failureHandler); - // Step 5. If we have any hits, that means we are NOT the only job on this index, and should not delete it + // Step 6. If we have any hits, that means we are NOT the only job on this index, and should not delete it // if we do not have any hits, we can drop the index and then skip the DBQ and alias deletion ActionListener customIndexSearchHandler = ActionListener.wrap( searchResponse -> { if (searchResponse == null || searchResponse.getHits().totalHits > 0) { deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion } else { - logger.info("Running DELETE Index on [" + indexName + "] for job [" + jobId + "]"); - DeleteIndexRequest request = new DeleteIndexRequest(indexName); + logger.info("Running DELETE Index on [" + indexName.get() + "] for job [" + jobId + "]"); + DeleteIndexRequest request = new DeleteIndexRequest(indexName.get()); request.indicesOptions(IndicesOptions.lenientExpandOpen()); // If we have deleted the index, then we don't need to delete the aliases or run the DBQ executeAsyncWithOrigin( @@ -331,9 +325,11 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri } ); - // Step 4. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases - ActionListener deleteCategorizerStateHandler = ActionListener.wrap( - response -> { + // Step 5. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases + ActionListener getJobHandler = ActionListener.wrap( + builder -> { + Job job = builder.build(); + indexName.set(job.getResultsIndexName()); if (indexName.equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) { //don't bother searching the index any further, we are on the default shared @@ -344,7 +340,7 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri .query(QueryBuilders.boolQuery().filter( QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)))); - SearchRequest searchRequest = new SearchRequest(indexName); + SearchRequest searchRequest = new SearchRequest(indexName.get()); searchRequest.source(source); executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, customIndexSearchHandler); } @@ -352,6 +348,14 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri failureHandler ); + // Step 4. Get the job as the result index name is required + ActionListener deleteCategorizerStateHandler = ActionListener.wrap( + response -> { + jobConfigProvider.getJob(jobId, getJobHandler); + }, + failureHandler + ); + // Step 3. Delete quantiles done, delete the categorizer state ActionListener deleteQuantilesHandler = ActionListener.wrap( response -> deleteCategorizerState(parentTaskClient, jobId, 1, deleteCategorizerStateHandler), @@ -554,36 +558,28 @@ public void onFailure(Exception e) { } } - private void markJobAsDeleting(String jobId, ActionListener listener, boolean force) { - clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState)); - builder.markJobAsDeleting(jobId, tasks, force); - return buildNewClusterState(currentState, builder); - } - - @Override - public void onFailure(String source, Exception e) { - listener.onFailure(e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - logger.debug("Job [" + jobId + "] is successfully marked as deleted"); - listener.onResponse(true); - } - }); + private void checkJobIsNotOpen(String jobId, ClusterState state) { + PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData.PersistentTask jobTask = MlTasks.getJobTask(jobId, tasks); + if (jobTask != null) { + JobTaskState jobTaskState = (JobTaskState) jobTask.getState(); + throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job is " + + ((jobTaskState == null) ? JobState.OPENING : jobTaskState.getState())); + } } - static boolean jobIsDeletedFromState(String jobId, ClusterState clusterState) { - return !MlMetadata.getMlMetadata(clusterState).getJobs().containsKey(jobId); - } + private void markJobAsDeletingIfNotUsed(String jobId, ActionListener listener) { - private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) { - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build()); - return newState.build(); + datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap( + datafeedIds -> { + if (datafeedIds.isEmpty() == false) { + listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed [" + + datafeedIds.iterator().next() + "] refers to it")); + return; + } + jobConfigProvider.markJobAsDeleting(jobId, listener); + }, + listener::onFailure + )); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 26e9ee3019b04..8ca0cb3f50796 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.apache.lucene.search.join.ScoreMode; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; @@ -25,6 +26,9 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.update.UpdateAction; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; @@ -40,6 +44,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; @@ -64,6 +69,7 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -211,23 +217,7 @@ public void deleteJob(String jobId, ActionListener actionListen DeleteRequest request = new DeleteRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); - executeAsyncWithOrigin(client, ML_ORIGIN, DeleteAction.INSTANCE, request, new ActionListener() { - @Override - public void onResponse(DeleteResponse deleteResponse) { - if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { - actionListener.onFailure(ExceptionsHelper.missingJobException(jobId)); - return; - } - - assert deleteResponse.getResult() == DocWriteResponse.Result.DELETED; - actionListener.onResponse(deleteResponse); - } - - @Override - public void onFailure(Exception e) { - actionListener.onFailure(e); - } - }); + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteAction.INSTANCE, request, actionListener); } /** @@ -423,6 +413,35 @@ public void onFailure(Exception e) { }); } + /** + * Sets the job's {@code deleting} field to true + * @param jobId The job to mark as deleting + * @param listener Responds with true if successful else an error + */ + public void markJobAsDeleting(String jobId, ActionListener listener) { + UpdateRequest updateRequest = new UpdateRequest(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); + updateRequest.retryOnConflict(3); + updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + updateRequest.doc(Collections.singletonMap(Job.DELETING.getPreferredName(), Boolean.TRUE)); + + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, ActionListener.wrap( + response -> { + assert (response.getResult() == DocWriteResponse.Result.UPDATED) || + (response.getResult() == DocWriteResponse.Result.NOOP); + listener.onResponse(Boolean.TRUE); + }, + e -> { + ElasticsearchException[] causes = ElasticsearchException.guessRootCauses(e); + if (causes[0] instanceof DocumentMissingException) { + listener.onFailure(ExceptionsHelper.missingJobException(jobId)); + } else { + listener.onFailure(e); + } + } + )); + } + /** * Expands an expression into the set of matching names. {@code expresssion} * may be a wildcard, a job group, a job Id or a list of those. diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobActionTests.java deleted file mode 100644 index 7464348adb9aa..0000000000000 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobActionTests.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.action; - -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.ml.MlMetadata; -import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; - -import java.util.Date; - -public class TransportDeleteJobActionTests extends ESTestCase { - - public void testJobIsDeletedFromState() { - MlMetadata mlMetadata = MlMetadata.EMPTY_METADATA; - - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata)) - .build(); - - assertTrue(TransportDeleteJobAction.jobIsDeletedFromState("job_id_1", clusterState)); - - MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); - mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()), false); - mlMetadata = mlBuilder.build(); - clusterState = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata)) - .build(); - - assertFalse(TransportDeleteJobAction.jobIsDeletedFromState("job_id_1", clusterState)); - } -} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 8e5d72b4436e8..4a98b380b0929 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -80,7 +80,7 @@ public void testValidate_jobMarkedAsDeleting() { jobBuilder.setDeleting(true); Exception e = expectThrows(ElasticsearchStatusException.class, () -> TransportOpenJobAction.validate("job_id", jobBuilder.build())); - assertEquals("Cannot open job [job_id] because it has been marked as deleted", e.getMessage()); + assertEquals("Cannot open job [job_id] because it is being deleted", e.getMessage()); } public void testValidate_jobWithoutVersion() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index 63f67c37dd944..c560d28ee7bf3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -174,8 +174,7 @@ public void testCrud() throws InterruptedException { exceptionHolder.set(null); blockingCall(actionListener -> jobConfigProvider.deleteJob(jobId, actionListener), deleteJobResponseHolder, exceptionHolder); - assertNull(deleteJobResponseHolder.get()); - assertEquals(ResourceNotFoundException.class, exceptionHolder.get().getClass()); + assertEquals(DocWriteResponse.Result.NOT_FOUND, deleteJobResponseHolder.get().getResult()); } public void testGetJobs() throws Exception { @@ -482,6 +481,29 @@ public void testValidateDatafeedJob() throws Exception { assertEquals(Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, exceptionHolder.get().getMessage()); } + public void testMarkAsDeleting() throws Exception { + AtomicReference responseHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + + blockingCall(listener -> jobConfigProvider.markJobAsDeleting("missing-job", listener), responseHolder, exceptionHolder); + assertNull(responseHolder.get()); + assertEquals(ResourceNotFoundException.class, exceptionHolder.get().getClass()); + + String jobId = "mark-as-deleting-job"; + putJob(createJob(jobId, Collections.emptyList())); + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); + + exceptionHolder.set(null); + blockingCall(listener -> jobConfigProvider.markJobAsDeleting(jobId, listener), responseHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertTrue(responseHolder.get()); + + // repeat the update for good measure + blockingCall(listener -> jobConfigProvider.markJobAsDeleting(jobId, listener), responseHolder, exceptionHolder); + assertTrue(responseHolder.get()); + assertNull(exceptionHolder.get()); + } + private static Job.Builder createJob(String jobId, List groups) { Detector.Builder d1 = new Detector.Builder("info_content", "domain"); d1.setOverFieldName("client"); From bde1479ed62838bd21c6b01630fa7f34ae4b848e Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 19 Oct 2018 11:47:53 +0100 Subject: [PATCH 2/3] Make intention when deleting job doc clearer --- .../ml/action/TransportDeleteJobAction.java | 3 ++- .../ml/job/persistence/JobConfigProvider.java | 27 ++++++++++++++++--- .../ml/integration/JobConfigProviderIT.java | 12 +++++++-- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 9e9c9fbcea5b1..d8e8b8f8178f7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -225,7 +225,8 @@ private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJ // Step 3. When the physical storage has been deleted, delete the job config document // ------- - CheckedConsumer deleteJobStateHandler = response -> jobConfigProvider.deleteJob(jobId, + // Don't report an error if the document has already been deleted + CheckedConsumer deleteJobStateHandler = response -> jobConfigProvider.deleteJob(jobId, false, ActionListener.wrap( deleteResponse -> apiResponseHandler.accept(Boolean.TRUE), listener::onFailure diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 8ca0cb3f50796..4d006725a095a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -208,16 +208,37 @@ public void onFailure(Exception e) { } /** - * Delete the anomaly detector job config document + * Delete the anomaly detector job config document. + * {@code errorIfMissing} controls whether or not an error if return + * if the document does not exist. * * @param jobId The job id + * @param errorIfMissing If the job document does not exist and this this true + * listener fails with a ResourceNotFoundException else + * the DeleteResponse is always return. * @param actionListener Deleted job listener */ - public void deleteJob(String jobId, ActionListener actionListener) { + public void deleteJob(String jobId, boolean errorIfMissing, ActionListener actionListener) { DeleteRequest request = new DeleteRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); - executeAsyncWithOrigin(client, ML_ORIGIN, DeleteAction.INSTANCE, request, actionListener); + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteAction.INSTANCE, request, new ActionListener() { + @Override + public void onResponse(DeleteResponse deleteResponse) { + if (errorIfMissing) { + if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { + actionListener.onFailure(ExceptionsHelper.missingJobException(jobId)); + return; + } + assert deleteResponse.getResult() == DocWriteResponse.Result.DELETED; + } + actionListener.onResponse(deleteResponse); + } + @Override + public void onFailure(Exception e) { + actionListener.onFailure(e); + } + }); } /** diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index c560d28ee7bf3..cb4284c874f77 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -158,7 +158,7 @@ public void testCrud() throws InterruptedException { // Delete Job AtomicReference deleteJobResponseHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.deleteJob(jobId, actionListener), + blockingCall(actionListener -> jobConfigProvider.deleteJob(jobId, true, actionListener), deleteJobResponseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertThat(deleteJobResponseHolder.get().getResult(), equalTo(DocWriteResponse.Result.DELETED)); @@ -172,7 +172,15 @@ public void testCrud() throws InterruptedException { // Delete deleted job deleteJobResponseHolder.set(null); exceptionHolder.set(null); - blockingCall(actionListener -> jobConfigProvider.deleteJob(jobId, actionListener), + blockingCall(actionListener -> jobConfigProvider.deleteJob(jobId, true, actionListener), + deleteJobResponseHolder, exceptionHolder); + assertNull(deleteJobResponseHolder.get()); + assertEquals(ResourceNotFoundException.class, exceptionHolder.get().getClass()); + + // and again with errorIfMissing set false + deleteJobResponseHolder.set(null); + exceptionHolder.set(null); + blockingCall(actionListener -> jobConfigProvider.deleteJob(jobId, false, actionListener), deleteJobResponseHolder, exceptionHolder); assertEquals(DocWriteResponse.Result.NOT_FOUND, deleteJobResponseHolder.get().getResult()); } From a73d451992d25060218d9a23472596e61ccf8214 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 19 Oct 2018 12:04:24 +0100 Subject: [PATCH 3/3] Fix typos --- .../xpack/ml/job/persistence/JobConfigProvider.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 4d006725a095a..2c19f081956f7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -209,11 +209,11 @@ public void onFailure(Exception e) { /** * Delete the anomaly detector job config document. - * {@code errorIfMissing} controls whether or not an error if return + * {@code errorIfMissing} controls whether or not an error is returned * if the document does not exist. * * @param jobId The job id - * @param errorIfMissing If the job document does not exist and this this true + * @param errorIfMissing If the job document does not exist and this is true * listener fails with a ResourceNotFoundException else * the DeleteResponse is always return. * @param actionListener Deleted job listener