Skip to content

Commit

Permalink
Delete forecast API (elastic#31134)
Browse files Browse the repository at this point in the history
  • Loading branch information
benwtrent committed Aug 28, 2018
1 parent d7e4a49 commit d40b201
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction;
import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
Expand Down Expand Up @@ -254,6 +255,7 @@ public List<Action<? extends ActionResponse>> getClientActions() {
UpdateProcessAction.INSTANCE,
DeleteExpiredDataAction.INSTANCE,
ForecastJobAction.INSTANCE,
DeleteForecastAction.INSTANCE,
GetCalendarsAction.INSTANCE,
PutCalendarAction.INSTANCE,
DeleteCalendarAction.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.io.IOException;

public class DeleteForecastAction extends Action<AcknowledgedResponse> {

public static final DeleteForecastAction INSTANCE = new DeleteForecastAction();
public static final String NAME = "cluster:admin/xpack/ml/job/forecast/delete";

private DeleteForecastAction() {
super(NAME);
}

@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}

public static class Request extends ActionRequest {

private String jobId;
private String forecastId;

public Request() {
}

public Request(String jobId, String forecastId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
this.forecastId = ExceptionsHelper.requireNonNull(forecastId, ModelSnapshotField.SNAPSHOT_ID.getPreferredName());
}

public String getJobId() {
return jobId;
}

public String getForecastId() {
return forecastId;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
forecastId = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
out.writeString(forecastId);
}
}

public static class RequestBuilder extends ActionRequestBuilder<Request, AcknowledgedResponse> {

public RequestBuilder(ElasticsearchClient client, DeleteForecastAction action) {
super(client, action, new Request());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ public final class Messages {
public static final String REST_JOB_NOT_CLOSED_REVERT = "Can only revert to a model snapshot when the job is closed.";
public static final String REST_NO_SUCH_MODEL_SNAPSHOT = "No model snapshot with id [{0}] exists for job [{1}]";
public static final String REST_START_AFTER_END = "Invalid time range: end time ''{0}'' is earlier than start time ''{1}''.";

public static final String REST_NO_SUCH_FORECAST = "No forecast with id [{0}] exists for job [{1}]";
public static final String REST_BAD_FORECAST_STATE = "Forecast [{0}] for job [{1}] needs to be either FAILED or FINISHED to be deleted";
public static final String FIELD_CANNOT_BE_NULL = "Field [{0}] cannot be null";

private Messages() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
Expand Down Expand Up @@ -276,6 +278,55 @@ public void testOverflowToDisk() throws Exception {

}

public void testDelete() throws Exception {
Detector.Builder detector = new Detector.Builder("mean", "value");

TimeValue bucketSpan = TimeValue.timeValueHours(1);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
analysisConfig.setBucketSpan(bucketSpan);
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeFormat("epoch");

Job.Builder job = new Job.Builder("forecast-it-test-delete");
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(dataDescription);

registerJob(job);
putJob(job);
openJob(job.getId());

long now = Instant.now().getEpochSecond();
long timestamp = now - 50 * bucketSpan.seconds();
List<String> data = new ArrayList<>();
while (timestamp < now) {
data.add(createJsonRecord(createRecord(timestamp, 10.0)));
data.add(createJsonRecord(createRecord(timestamp, 30.0)));
timestamp += bucketSpan.seconds();
}

postData(job.getId(), data.stream().collect(Collectors.joining()));
flushJob(job.getId(), false);

String forecastIdDefaultDurationDefaultExpiry = forecast(job.getId(), null, null);

waitForecastToFinish(job.getId(), forecastIdDefaultDurationDefaultExpiry);
closeJob(job.getId());
ForecastRequestStats forecastStats = getForecastStats(job.getId(), forecastIdDefaultDurationDefaultExpiry);
assertNotNull(forecastStats);

DeleteForecastAction.Request request = new DeleteForecastAction.Request(job.getId(), forecastIdDefaultDurationDefaultExpiry);
AcknowledgedResponse response = client().execute(DeleteForecastAction.INSTANCE, request).actionGet();
assertTrue(response.isAcknowledged());

forecastStats = getForecastStats(job.getId(), forecastIdDefaultDurationDefaultExpiry);
assertNull(forecastStats);

ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> client().execute(DeleteForecastAction.INSTANCE, request).actionGet());
assertThat(e.getMessage(),
equalTo(String.format("No forecast with id [%s] exists for job [%s]", forecastIdDefaultDurationDefaultExpiry, job.getId())));
}

private void createDataWithLotsOfClientIps(TimeValue bucketSpan, Job.Builder job) throws IOException {
long now = Instant.now().getEpochSecond();
long timestamp = now - 15 * bucketSpan.seconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction;
import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
Expand Down Expand Up @@ -114,6 +115,7 @@
import org.elasticsearch.xpack.ml.action.TransportDeleteDatafeedAction;
import org.elasticsearch.xpack.ml.action.TransportDeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.action.TransportDeleteFilterAction;
import org.elasticsearch.xpack.ml.action.TransportDeleteForecastAction;
import org.elasticsearch.xpack.ml.action.TransportDeleteJobAction;
import org.elasticsearch.xpack.ml.action.TransportDeleteModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.TransportFinalizeJobExecutionAction;
Expand Down Expand Up @@ -200,6 +202,7 @@
import org.elasticsearch.xpack.ml.rest.filter.RestPutFilterAction;
import org.elasticsearch.xpack.ml.rest.filter.RestUpdateFilterAction;
import org.elasticsearch.xpack.ml.rest.job.RestCloseJobAction;
import org.elasticsearch.xpack.ml.rest.job.RestDeleteForecastAction;
import org.elasticsearch.xpack.ml.rest.job.RestDeleteJobAction;
import org.elasticsearch.xpack.ml.rest.job.RestFlushJobAction;
import org.elasticsearch.xpack.ml.rest.job.RestForecastJobAction;
Expand Down Expand Up @@ -489,6 +492,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new RestDeleteModelSnapshotAction(settings, restController),
new RestDeleteExpiredDataAction(settings, restController),
new RestForecastJobAction(settings, restController),
new RestDeleteForecastAction(settings, restController),
new RestGetCalendarsAction(settings, restController),
new RestPutCalendarAction(settings, restController),
new RestDeleteCalendarAction(settings, restController),
Expand Down Expand Up @@ -545,6 +549,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new ActionHandler<>(UpdateProcessAction.INSTANCE, TransportUpdateProcessAction.class),
new ActionHandler<>(DeleteExpiredDataAction.INSTANCE, TransportDeleteExpiredDataAction.class),
new ActionHandler<>(ForecastJobAction.INSTANCE, TransportForecastJobAction.class),
new ActionHandler<>(DeleteForecastAction.INSTANCE, TransportDeleteForecastAction.class),
new ActionHandler<>(GetCalendarsAction.INSTANCE, TransportGetCalendarsAction.class),
new ActionHandler<>(PutCalendarAction.INSTANCE, TransportPutCalendarAction.class),
new ActionHandler<>(DeleteCalendarAction.INSTANCE, TransportDeleteCalendarAction.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats.ForecastRequestStatus;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;

import java.util.EnumSet;
import java.util.Set;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;


public class TransportDeleteForecastAction extends HandledTransportAction<DeleteForecastAction.Request,
AcknowledgedResponse> {

private static final String RESULTS_INDEX_PATTERN = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*";
private final Client client;
private final JobResultsProvider jobResultsProvider;
private static final Set<ForecastRequestStatus> DELETABLE_STATUSES =
EnumSet.of(ForecastRequestStatus.FINISHED, ForecastRequestStatus.FAILED);

@Inject
public TransportDeleteForecastAction(Settings settings, TransportService transportService, ActionFilters actionFilters, Client client,
JobResultsProvider jobResultsProvider) {
super(settings, DeleteForecastAction.NAME, transportService, actionFilters,
DeleteForecastAction.Request::new);
this.client = client;
this.jobResultsProvider = jobResultsProvider;
}

@Override
protected void doExecute(Task task, DeleteForecastAction.Request request, ActionListener<AcknowledgedResponse> listener) {
final String forecastId = request.getForecastId();
final String jobId = request.getJobId();
jobResultsProvider.getForecastRequestStats(jobId, forecastId, (forecastRequestStats) -> {
if (forecastRequestStats == null) {
listener.onFailure(
new ResourceNotFoundException(Messages.getMessage(Messages.REST_NO_SUCH_FORECAST, forecastId, jobId)));
return;
}

if (DELETABLE_STATUSES.contains(forecastRequestStats.getStatus())) {
DeleteByQueryRequest deleteByQueryRequest = buildDeleteByQuery(jobId, forecastId);
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(
response -> {
if (response.getDeleted() > 0) {
logger.info("Deleted forecast [{}] from job [{}]", forecastId, jobId);
listener.onResponse(new AcknowledgedResponse(true));
} else {
listener.onFailure(
new ResourceNotFoundException(Messages.getMessage(Messages.REST_NO_SUCH_FORECAST, forecastId, jobId)));
}
},
listener::onFailure));
} else {
listener.onFailure(
ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_BAD_FORECAST_STATE, forecastId, jobId)));
}
}, listener::onFailure);
}

private DeleteByQueryRequest buildDeleteByQuery(String jobId, String forecastId) {
SearchRequest searchRequest = new SearchRequest();
// We need to create the DeleteByQueryRequest before we modify the SearchRequest
// because the constructor of the former wipes the latter
DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest);

searchRequest.indices(RESULTS_INDEX_PATTERN);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.minimumShouldMatch(1)
.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
ForecastRequestStats.RESULT_TYPE_VALUE,
Forecast.RESULT_TYPE_VALUE))
.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastId)));
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
searchRequest.source(new SearchSourceBuilder().query(query));
return request;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.job;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.ml.MachineLearning;

import java.io.IOException;

public class RestDeleteForecastAction extends BaseRestHandler {

public RestDeleteForecastAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.DELETE,
MachineLearning.BASE_PATH +
"anomaly_detectors/{" + Job.ID.getPreferredName() +
"}/_forecast/{" + Forecast.FORECAST_ID.getPreferredName() + "}",
this);
}

@Override
public String getName() {
return "xpack_ml_delete_forecast_action";
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String jobId = restRequest.param(Job.ID.getPreferredName());
String forecastId = restRequest.param(Forecast.FORECAST_ID.getPreferredName());
final DeleteForecastAction.Request request = new DeleteForecastAction.Request(jobId, forecastId);
return channel -> client.execute(DeleteForecastAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

0 comments on commit d40b201

Please sign in to comment.