Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

update/delete historical detector;search AD tasks #362

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import com.amazon.opendistroforelasticsearch.ad.rest.RestGetAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestIndexAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestPreviewAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchADTasksAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyDetectorInfoAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyResultAction;
Expand Down Expand Up @@ -152,6 +153,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.SearchADTasksAction;
import com.amazon.opendistroforelasticsearch.ad.transport.SearchADTasksTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyDetectorInfoAction;
import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyDetectorInfoTransportAction;
Expand All @@ -164,6 +167,7 @@
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.ADSearchHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultBulkIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.DetectionStateHandler;
Expand Down Expand Up @@ -254,6 +258,7 @@ public List<RestHandler> getRestHandlers(
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(settings, clusterService);
RestSearchAnomalyDetectorAction searchAnomalyDetectorAction = new RestSearchAnomalyDetectorAction();
RestSearchAnomalyResultAction searchAnomalyResultAction = new RestSearchAnomalyResultAction();
RestSearchADTasksAction searchADTasksAction = new RestSearchADTasksAction();
RestDeleteAnomalyDetectorAction deleteAnomalyDetectorAction = new RestDeleteAnomalyDetectorAction();
RestExecuteAnomalyDetectorAction executeAnomalyDetectorAction = new RestExecuteAnomalyDetectorAction(settings, clusterService);
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(adStats, this.nodeFilter);
Expand All @@ -267,6 +272,7 @@ public List<RestHandler> getRestHandlers(
restIndexAnomalyDetectorAction,
searchAnomalyDetectorAction,
searchAnomalyResultAction,
searchADTasksAction,
deleteAnomalyDetectorAction,
executeAnomalyDetectorAction,
anomalyDetectorJobAction,
Expand Down Expand Up @@ -552,6 +558,8 @@ public Collection<Object> createComponents(
adTaskCacheManager
);

ADSearchHandler adSearchHandler = new ADSearchHandler(settings, clusterService, client);

// return objects used by Guice to inject dependencies for e.g.,
// transport action handler constructors
return ImmutableList
Expand All @@ -578,7 +586,8 @@ public Collection<Object> createComponents(
modelPartitioner,
cacheProvider,
adTaskManager,
adBatchTaskRunner
adBatchTaskRunner,
adSearchHandler
);
}

Expand Down Expand Up @@ -676,6 +685,7 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(RCFPollingAction.INSTANCE, RCFPollingTransportAction.class),
new ActionHandler<>(SearchAnomalyDetectorAction.INSTANCE, SearchAnomalyDetectorTransportAction.class),
new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class),
new ActionHandler<>(SearchADTasksAction.INSTANCE, SearchADTasksTransportAction.class),
new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class),
new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class),
new ActionHandler<>(GetAnomalyDetectorAction.INSTANCE, GetAnomalyDetectorTransportAction.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ public AnomalyDetector(
this.user = user;
this.detectorType = detectorType;
this.detectionDateRange = detectionDateRange;

// TODO: remove this check when we support HC historical detector
if (!isRealTimeDetector(detectionDateRange) && categoryFields != null && categoryFields.size() > 0) {
throw new IllegalArgumentException("Don't support high cardinality historical detector now");
}
}

public AnomalyDetector(StreamInput input) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.rest;

import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
import com.amazon.opendistroforelasticsearch.ad.model.ADTask;
import com.amazon.opendistroforelasticsearch.ad.transport.SearchADTasksAction;

/**
* This class consists of the REST handler to search AD tasks.
*/
public class RestSearchADTasksAction extends AbstractSearchAction<ADTask> {

private static final String URL_PATH = AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI + "/tasks/_search";
private final String SEARCH_ANOMALY_DETECTION_TASKS = "search_anomaly_detection_tasks";

public RestSearchADTasksAction() {
super(URL_PATH, CommonName.DETECTION_STATE_INDEX, ADTask.class, SearchADTasksAction.INSTANCE);
}

@Override
public String getName() {
return SEARCH_ANOMALY_DETECTION_TASKS;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.amazon.opendistroforelasticsearch.ad.rest.handler;

import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static com.amazon.opendistroforelasticsearch.ad.util.ExceptionUtil.getShardsFailure;
import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

Expand Down Expand Up @@ -45,6 +44,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -57,10 +57,12 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.transport.TransportService;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.task.ADTaskManager;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorResponse;
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
import com.amazon.opendistroforelasticsearch.commons.authuser.User;
Expand Down Expand Up @@ -94,15 +96,18 @@ public class IndexAnomalyDetectorActionHandler {
private final AnomalyDetectorActionHandler handler = new AnomalyDetectorActionHandler();
private final RestRequest.Method method;
private final Client client;
private final TransportService transportService;
private final NamedXContentRegistry xContentRegistry;
private final ActionListener<IndexAnomalyDetectorResponse> listener;
private final User user;
private final ADTaskManager adTaskManager;

/**
* Constructor function.
*
* @param clusterService ClusterService
* @param client ES node client that executes actions on the local node
* @param transportService ES transport service
* @param listener ES channel used to construct bytes / builder based outputs, and send responses
* @param anomalyDetectionIndices anomaly detector index manager
* @param detectorId detector identifier
Expand All @@ -117,10 +122,12 @@ public class IndexAnomalyDetectorActionHandler {
* @param method Rest Method type
* @param xContentRegistry Registry which is used for XContentParser
* @param user User context
* @param adTaskManager AD Task manager
*/
public IndexAnomalyDetectorActionHandler(
ClusterService clusterService,
Client client,
TransportService transportService,
ActionListener<IndexAnomalyDetectorResponse> listener,
AnomalyDetectionIndices anomalyDetectionIndices,
String detectorId,
Expand All @@ -134,10 +141,12 @@ public IndexAnomalyDetectorActionHandler(
Integer maxAnomalyFeatures,
RestRequest.Method method,
NamedXContentRegistry xContentRegistry,
User user
User user,
ADTaskManager adTaskManager
) {
this.clusterService = clusterService;
this.client = client;
this.transportService = transportService;
this.anomalyDetectionIndices = anomalyDetectionIndices;
this.listener = listener;
this.detectorId = detectorId;
Expand All @@ -152,6 +161,7 @@ public IndexAnomalyDetectorActionHandler(
this.method = method;
this.xContentRegistry = xContentRegistry;
this.user = user;
this.adTaskManager = adTaskManager;
}

/**
Expand Down Expand Up @@ -213,10 +223,31 @@ private void onGetAnomalyDetectorResponse(GetResponse response) {
try (XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetector existingDetector = AnomalyDetector.parse(parser, response.getId(), response.getVersion());
if (!hasCategoryField(existingDetector) && hasCategoryField(this.anomalyDetector)) {
validateAgainstExistingMultiEntityAnomalyDetector(detectorId);
// We have separate flows for realtime and historical detector currently. User
// can't change detector from realtime to historical, vice versa.
if (existingDetector.isRealTimeDetector() != anomalyDetector.isRealTimeDetector()) {
listener
.onFailure(
new ElasticsearchStatusException(
"Can't change detector type between realtime and historical detector",
RestStatus.BAD_REQUEST
)
);
return;
}

if (existingDetector.isRealTimeDetector()) {
validateDetector(existingDetector);
} else {
validateCategoricalField(detectorId);
adTaskManager.getLatestADTask(detectorId, (adTask) -> {
if (adTask.isPresent() && !adTaskManager.isADTaskEnded(adTask.get())) {
// can't update detector if there is AD task running
listener.onFailure(new ElasticsearchStatusException("Detector is running", RestStatus.INTERNAL_SERVER_ERROR));
} else {
// TODO: change to validateDetector method when we support HC historical detector
searchAdInputIndices(detectorId);
}
}, transportService, listener);
}
} catch (IOException e) {
String message = "Failed to parse anomaly detector " + detectorId;
Expand All @@ -226,6 +257,14 @@ private void onGetAnomalyDetectorResponse(GetResponse response) {

}

private void validateDetector(AnomalyDetector existingDetector) {
if (!hasCategoryField(existingDetector) && hasCategoryField(this.anomalyDetector)) {
validateAgainstExistingMultiEntityAnomalyDetector(detectorId);
} else {
validateCategoricalField(detectorId);
}
}

private boolean hasCategoryField(AnomalyDetector detector) {
return detector.getCategoryField() != null && !detector.getCategoryField().isEmpty();
}
Expand Down Expand Up @@ -464,7 +503,7 @@ private void indexAnomalyDetector(String detectorId) throws IOException {
client.index(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
String errorMsg = getShardsFailure(indexResponse);
String errorMsg = checkShardsFailure(indexResponse);
if (errorMsg != null) {
listener.onFailure(new ElasticsearchStatusException(errorMsg, indexResponse.status()));
return;
Expand All @@ -484,7 +523,15 @@ public void onResponse(IndexResponse indexResponse) {

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
logger.warn("Failed to update detector", e);
if (e.getMessage() != null && e.getMessage().contains("version conflict")) {
listener
.onFailure(
new IllegalArgumentException("There was a problem updating the historical detector:[" + detectorId + "]")
);
} else {
listener.onFailure(e);
}
}
});
}
Expand All @@ -505,4 +552,14 @@ private void onCreateMappingsResponse(CreateIndexResponse response) throws IOExc
}
}

private String checkShardsFailure(IndexResponse response) {
StringBuilder failureReasons = new StringBuilder();
if (response.getShardInfo().getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : response.getShardInfo().getFailures()) {
failureReasons.append(failure);
}
return failureReasons.toString();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,11 @@ public void stopDetector(
);
}

private void getDetector(
public <T> void getDetector(
String detectorId,
Consumer<AnomalyDetector> realTimeDetectorConsumer,
Consumer<AnomalyDetector> historicalDetectorConsumer,
ActionListener<AnomalyDetectorJobResponse> listener
ActionListener<T> listener
) {
GetRequest getRequest = new GetRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX).id(detectorId);
client.get(getRequest, ActionListener.wrap(response -> {
Expand Down Expand Up @@ -425,7 +425,7 @@ private boolean lastUpdateTimeExpired(ADTask adTask) {
return adTask.getLastUpdateTime().plus(2 * pieceIntervalSeconds, ChronoUnit.SECONDS).isBefore(Instant.now());
}

private boolean isADTaskEnded(ADTask adTask) {
public boolean isADTaskEnded(ADTask adTask) {
return ADTaskState.STOPPED.name().equals(adTask.getState())
|| ADTaskState.FINISHED.name().equals(adTask.getState())
|| ADTaskState.FAILED.name().equals(adTask.getState());
Expand Down Expand Up @@ -512,7 +512,7 @@ public void getLatestADTaskProfile(String detectorId, TransportService transport
listener.onFailure(e);
}));
} else {
listener.onFailure(new ResourceNotFoundException(detectorId, "Can't find task for detector"));
listener.onFailure(new ResourceNotFoundException(detectorId, "Can't find latest task for detector"));
}
}, transportService, listener);
}
Expand Down Expand Up @@ -873,7 +873,7 @@ public void handleADTaskException(ADTask adTask, Exception e) {
return;
}
if (e instanceof ADTaskCancelledException) {
logger.warn("AD task cancelled: " + adTask.getTaskId());
logger.info("AD task cancelled, taskId: {}, detectorId: {}", adTask.getTaskId(), adTask.getDetectorId());
state = ADTaskState.STOPPED.name();
String stoppedBy = ((ADTaskCancelledException) e).getCancelledBy();
if (stoppedBy != null) {
Expand Down Expand Up @@ -952,6 +952,32 @@ public ADTaskCancellationState cancelLocalTaskByDetectorId(String detectorId, St
return cancellationState;
}

/**
* Delete AD tasks docs.
*
* @param detectorId detector id
* @param function AD function
* @param listener action listener
*/
public void deleteADTasks(String detectorId, AnomalyDetectorFunction function, ActionListener<DeleteResponse> listener) {
DeleteByQueryRequest request = new DeleteByQueryRequest(CommonName.DETECTION_STATE_INDEX);

BoolQueryBuilder query = new BoolQueryBuilder();
query.filter(new TermQueryBuilder(DETECTOR_ID_FIELD, detectorId));

request.setQuery(query);
client.execute(DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(r -> {
logger.info("AD tasks deleted for detector {}", detectorId);
weicongs-amazon marked this conversation as resolved.
Show resolved Hide resolved
function.execute();
}, e -> {
if (e instanceof IndexNotFoundException) {
function.execute();
} else {
listener.onFailure(e);
}
}));
}

/**
* Remove detector from cache on coordinating node.
*
Expand Down
Loading