diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index 420f17c1..97a20e90 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -96,6 +96,7 @@ import com.amazon.opendistroforelasticsearch.ad.rest.RestGetAnomalyDetectorAction; import com.amazon.opendistroforelasticsearch.ad.rest.RestIndexAnomalyDetectorAction; import com.amazon.opendistroforelasticsearch.ad.rest.RestPreviewAnomalyDetectorAction; +import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchADTasksAction; import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyDetectorAction; import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyDetectorInfoAction; import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyResultAction; @@ -152,6 +153,8 @@ import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingTransportAction; import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultAction; import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultTransportAction; +import com.amazon.opendistroforelasticsearch.ad.transport.SearchADTasksAction; +import com.amazon.opendistroforelasticsearch.ad.transport.SearchADTasksTransportAction; import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyDetectorAction; import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyDetectorInfoAction; import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyDetectorInfoTransportAction; @@ -164,6 +167,7 @@ import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction; import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction; import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction; +import com.amazon.opendistroforelasticsearch.ad.transport.handler.ADSearchHandler; import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyIndexHandler; import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultBulkIndexHandler; import com.amazon.opendistroforelasticsearch.ad.transport.handler.DetectionStateHandler; @@ -254,6 +258,7 @@ public List getRestHandlers( RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(settings, clusterService); RestSearchAnomalyDetectorAction searchAnomalyDetectorAction = new RestSearchAnomalyDetectorAction(); RestSearchAnomalyResultAction searchAnomalyResultAction = new RestSearchAnomalyResultAction(); + RestSearchADTasksAction searchADTasksAction = new RestSearchADTasksAction(); RestDeleteAnomalyDetectorAction deleteAnomalyDetectorAction = new RestDeleteAnomalyDetectorAction(); RestExecuteAnomalyDetectorAction executeAnomalyDetectorAction = new RestExecuteAnomalyDetectorAction(settings, clusterService); RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(adStats, this.nodeFilter); @@ -267,6 +272,7 @@ public List getRestHandlers( restIndexAnomalyDetectorAction, searchAnomalyDetectorAction, searchAnomalyResultAction, + searchADTasksAction, deleteAnomalyDetectorAction, executeAnomalyDetectorAction, anomalyDetectorJobAction, @@ -552,6 +558,8 @@ public Collection createComponents( adTaskCacheManager ); + ADSearchHandler adSearchHandler = new ADSearchHandler(settings, clusterService, client); + // return objects used by Guice to inject dependencies for e.g., // transport action handler constructors return ImmutableList @@ -578,7 +586,8 @@ public Collection createComponents( modelPartitioner, cacheProvider, adTaskManager, - adBatchTaskRunner + adBatchTaskRunner, + adSearchHandler ); } @@ -676,6 +685,7 @@ public List getNamedXContent() { new ActionHandler<>(RCFPollingAction.INSTANCE, RCFPollingTransportAction.class), new ActionHandler<>(SearchAnomalyDetectorAction.INSTANCE, SearchAnomalyDetectorTransportAction.class), new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class), + new ActionHandler<>(SearchADTasksAction.INSTANCE, SearchADTasksTransportAction.class), new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class), new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class), new ActionHandler<>(GetAnomalyDetectorAction.INSTANCE, GetAnomalyDetectorTransportAction.class), diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetector.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetector.java index 4cc373a3..581a5dcc 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetector.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetector.java @@ -227,6 +227,11 @@ public AnomalyDetector( this.user = user; this.detectorType = detectorType; this.detectionDateRange = detectionDateRange; + + // TODO: remove this check when we support HC historical detector + if (!isRealTimeDetector(detectionDateRange) && categoryFields != null && categoryFields.size() > 0) { + throw new IllegalArgumentException("Don't support high cardinality historical detector now"); + } } public AnomalyDetector(StreamInput input) throws IOException { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestSearchADTasksAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestSearchADTasksAction.java new file mode 100644 index 00000000..b12b5fac --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestSearchADTasksAction.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.rest; + +import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.model.ADTask; +import com.amazon.opendistroforelasticsearch.ad.transport.SearchADTasksAction; + +/** + * This class consists of the REST handler to search AD tasks. + */ +public class RestSearchADTasksAction extends AbstractSearchAction { + + private static final String URL_PATH = AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI + "/tasks/_search"; + private final String SEARCH_ANOMALY_DETECTION_TASKS = "search_anomaly_detection_tasks"; + + public RestSearchADTasksAction() { + super(URL_PATH, CommonName.DETECTION_STATE_INDEX, ADTask.class, SearchADTasksAction.INSTANCE); + } + + @Override + public String getName() { + return SEARCH_ANOMALY_DETECTION_TASKS; + } + +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorActionHandler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorActionHandler.java index 78f4d2ab..a34bbf4f 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorActionHandler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorActionHandler.java @@ -16,7 +16,6 @@ package com.amazon.opendistroforelasticsearch.ad.rest.handler; import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; -import static com.amazon.opendistroforelasticsearch.ad.util.ExceptionUtil.getShardsFailure; import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; @@ -45,6 +44,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; @@ -57,10 +57,12 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.transport.TransportService; import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.task.ADTaskManager; import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorResponse; import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; import com.amazon.opendistroforelasticsearch.commons.authuser.User; @@ -94,15 +96,18 @@ public class IndexAnomalyDetectorActionHandler { private final AnomalyDetectorActionHandler handler = new AnomalyDetectorActionHandler(); private final RestRequest.Method method; private final Client client; + private final TransportService transportService; private final NamedXContentRegistry xContentRegistry; private final ActionListener listener; private final User user; + private final ADTaskManager adTaskManager; /** * Constructor function. * * @param clusterService ClusterService * @param client ES node client that executes actions on the local node + * @param transportService ES transport service * @param listener ES channel used to construct bytes / builder based outputs, and send responses * @param anomalyDetectionIndices anomaly detector index manager * @param detectorId detector identifier @@ -117,10 +122,12 @@ public class IndexAnomalyDetectorActionHandler { * @param method Rest Method type * @param xContentRegistry Registry which is used for XContentParser * @param user User context + * @param adTaskManager AD Task manager */ public IndexAnomalyDetectorActionHandler( ClusterService clusterService, Client client, + TransportService transportService, ActionListener listener, AnomalyDetectionIndices anomalyDetectionIndices, String detectorId, @@ -134,10 +141,12 @@ public IndexAnomalyDetectorActionHandler( Integer maxAnomalyFeatures, RestRequest.Method method, NamedXContentRegistry xContentRegistry, - User user + User user, + ADTaskManager adTaskManager ) { this.clusterService = clusterService; this.client = client; + this.transportService = transportService; this.anomalyDetectionIndices = anomalyDetectionIndices; this.listener = listener; this.detectorId = detectorId; @@ -152,6 +161,7 @@ public IndexAnomalyDetectorActionHandler( this.method = method; this.xContentRegistry = xContentRegistry; this.user = user; + this.adTaskManager = adTaskManager; } /** @@ -213,10 +223,31 @@ private void onGetAnomalyDetectorResponse(GetResponse response) { try (XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); AnomalyDetector existingDetector = AnomalyDetector.parse(parser, response.getId(), response.getVersion()); - if (!hasCategoryField(existingDetector) && hasCategoryField(this.anomalyDetector)) { - validateAgainstExistingMultiEntityAnomalyDetector(detectorId); + // We have separate flows for realtime and historical detector currently. User + // can't change detector from realtime to historical, vice versa. + if (existingDetector.isRealTimeDetector() != anomalyDetector.isRealTimeDetector()) { + listener + .onFailure( + new ElasticsearchStatusException( + "Can't change detector type between realtime and historical detector", + RestStatus.BAD_REQUEST + ) + ); + return; + } + + if (existingDetector.isRealTimeDetector()) { + validateDetector(existingDetector); } else { - validateCategoricalField(detectorId); + adTaskManager.getLatestADTask(detectorId, (adTask) -> { + if (adTask.isPresent() && !adTaskManager.isADTaskEnded(adTask.get())) { + // can't update detector if there is AD task running + listener.onFailure(new ElasticsearchStatusException("Detector is running", RestStatus.INTERNAL_SERVER_ERROR)); + } else { + // TODO: change to validateDetector method when we support HC historical detector + searchAdInputIndices(detectorId); + } + }, transportService, listener); } } catch (IOException e) { String message = "Failed to parse anomaly detector " + detectorId; @@ -226,6 +257,14 @@ private void onGetAnomalyDetectorResponse(GetResponse response) { } + private void validateDetector(AnomalyDetector existingDetector) { + if (!hasCategoryField(existingDetector) && hasCategoryField(this.anomalyDetector)) { + validateAgainstExistingMultiEntityAnomalyDetector(detectorId); + } else { + validateCategoricalField(detectorId); + } + } + private boolean hasCategoryField(AnomalyDetector detector) { return detector.getCategoryField() != null && !detector.getCategoryField().isEmpty(); } @@ -464,7 +503,7 @@ private void indexAnomalyDetector(String detectorId) throws IOException { client.index(indexRequest, new ActionListener() { @Override public void onResponse(IndexResponse indexResponse) { - String errorMsg = getShardsFailure(indexResponse); + String errorMsg = checkShardsFailure(indexResponse); if (errorMsg != null) { listener.onFailure(new ElasticsearchStatusException(errorMsg, indexResponse.status())); return; @@ -484,7 +523,15 @@ public void onResponse(IndexResponse indexResponse) { @Override public void onFailure(Exception e) { - listener.onFailure(e); + logger.warn("Failed to update detector", e); + if (e.getMessage() != null && e.getMessage().contains("version conflict")) { + listener + .onFailure( + new IllegalArgumentException("There was a problem updating the historical detector:[" + detectorId + "]") + ); + } else { + listener.onFailure(e); + } } }); } @@ -505,4 +552,14 @@ private void onCreateMappingsResponse(CreateIndexResponse response) throws IOExc } } + private String checkShardsFailure(IndexResponse response) { + StringBuilder failureReasons = new StringBuilder(); + if (response.getShardInfo().getFailed() > 0) { + for (ReplicationResponse.ShardInfo.Failure failure : response.getShardInfo().getFailures()) { + failureReasons.append(failure); + } + return failureReasons.toString(); + } + return null; + } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADTaskManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADTaskManager.java index 5aad2e21..cc3fe1e3 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADTaskManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADTaskManager.java @@ -275,11 +275,11 @@ public void stopDetector( ); } - private void getDetector( + public void getDetector( String detectorId, Consumer realTimeDetectorConsumer, Consumer historicalDetectorConsumer, - ActionListener listener + ActionListener listener ) { GetRequest getRequest = new GetRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX).id(detectorId); client.get(getRequest, ActionListener.wrap(response -> { @@ -425,7 +425,7 @@ private boolean lastUpdateTimeExpired(ADTask adTask) { return adTask.getLastUpdateTime().plus(2 * pieceIntervalSeconds, ChronoUnit.SECONDS).isBefore(Instant.now()); } - private boolean isADTaskEnded(ADTask adTask) { + public boolean isADTaskEnded(ADTask adTask) { return ADTaskState.STOPPED.name().equals(adTask.getState()) || ADTaskState.FINISHED.name().equals(adTask.getState()) || ADTaskState.FAILED.name().equals(adTask.getState()); @@ -512,7 +512,7 @@ public void getLatestADTaskProfile(String detectorId, TransportService transport listener.onFailure(e); })); } else { - listener.onFailure(new ResourceNotFoundException(detectorId, "Can't find task for detector")); + listener.onFailure(new ResourceNotFoundException(detectorId, "Can't find latest task for detector")); } }, transportService, listener); } @@ -873,7 +873,7 @@ public void handleADTaskException(ADTask adTask, Exception e) { return; } if (e instanceof ADTaskCancelledException) { - logger.warn("AD task cancelled: " + adTask.getTaskId()); + logger.info("AD task cancelled, taskId: {}, detectorId: {}", adTask.getTaskId(), adTask.getDetectorId()); state = ADTaskState.STOPPED.name(); String stoppedBy = ((ADTaskCancelledException) e).getCancelledBy(); if (stoppedBy != null) { @@ -952,6 +952,32 @@ public ADTaskCancellationState cancelLocalTaskByDetectorId(String detectorId, St return cancellationState; } + /** + * Delete AD tasks docs. + * + * @param detectorId detector id + * @param function AD function + * @param listener action listener + */ + public void deleteADTasks(String detectorId, AnomalyDetectorFunction function, ActionListener listener) { + DeleteByQueryRequest request = new DeleteByQueryRequest(CommonName.DETECTION_STATE_INDEX); + + BoolQueryBuilder query = new BoolQueryBuilder(); + query.filter(new TermQueryBuilder(DETECTOR_ID_FIELD, detectorId)); + + request.setQuery(query); + client.execute(DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(r -> { + logger.info("AD tasks deleted for detector {}", detectorId); + function.execute(); + }, e -> { + if (e instanceof IndexNotFoundException) { + function.execute(); + } else { + listener.onFailure(e); + } + })); + } + /** * Remove detector from cache on coordinating node. * diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/DeleteAnomalyDetectorTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/DeleteAnomalyDetectorTransportAction.java index b5d6d94d..e780b11f 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/DeleteAnomalyDetectorTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/DeleteAnomalyDetectorTransportAction.java @@ -52,6 +52,7 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; import com.amazon.opendistroforelasticsearch.ad.rest.handler.AnomalyDetectorFunction; import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; +import com.amazon.opendistroforelasticsearch.ad.task.ADTaskManager; import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; import com.amazon.opendistroforelasticsearch.commons.authuser.User; @@ -60,7 +61,9 @@ public class DeleteAnomalyDetectorTransportAction extends HandledTransportAction private static final Logger LOG = LogManager.getLogger(DeleteAnomalyDetectorTransportAction.class); private final Client client; private final ClusterService clusterService; + private final TransportService transportService; private NamedXContentRegistry xContentRegistry; + private final ADTaskManager adTaskManager; private volatile Boolean filterByEnabled; @Inject @@ -70,12 +73,15 @@ public DeleteAnomalyDetectorTransportAction( Client client, ClusterService clusterService, Settings settings, - NamedXContentRegistry xContentRegistry + NamedXContentRegistry xContentRegistry, + ADTaskManager adTaskManager ) { super(DeleteAnomalyDetectorAction.NAME, transportService, actionFilters, DeleteAnomalyDetectorRequest::new); + this.transportService = transportService; this.client = client; this.clusterService = clusterService; this.xContentRegistry = xContentRegistry; + this.adTaskManager = adTaskManager; filterByEnabled = AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterByEnabled = it); } @@ -92,7 +98,22 @@ protected void doExecute(Task task, DeleteAnomalyDetectorRequest request, Action detectorId, filterByEnabled, listener, - () -> getDetectorJob(detectorId, listener, () -> deleteAnomalyDetectorJobDoc(detectorId, listener)), + () -> adTaskManager + .getDetector( + detectorId, + // realtime detector + detector -> getDetectorJob(detectorId, listener, () -> deleteAnomalyDetectorJobDoc(detectorId, listener)), + // historical detector + detector -> adTaskManager.getLatestADTask(detectorId, adTask -> { + if (adTask.isPresent() && !adTaskManager.isADTaskEnded(adTask.get())) { + listener + .onFailure(new ElasticsearchStatusException("Detector is running", RestStatus.INTERNAL_SERVER_ERROR)); + } else { + adTaskManager.deleteADTasks(detectorId, () -> deleteDetectorStateDoc(detectorId, listener), listener); + } + }, transportService, listener), + listener + ), client, clusterService, xContentRegistry diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/IndexAnomalyDetectorTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/IndexAnomalyDetectorTransportAction.java index 64e9aac0..4b4c4cfc 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/IndexAnomalyDetectorTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/IndexAnomalyDetectorTransportAction.java @@ -48,14 +48,17 @@ import com.amazon.opendistroforelasticsearch.ad.rest.handler.AnomalyDetectorFunction; import com.amazon.opendistroforelasticsearch.ad.rest.handler.IndexAnomalyDetectorActionHandler; import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; +import com.amazon.opendistroforelasticsearch.ad.task.ADTaskManager; import com.amazon.opendistroforelasticsearch.commons.authuser.User; public class IndexAnomalyDetectorTransportAction extends HandledTransportAction { private static final Logger LOG = LogManager.getLogger(IndexAnomalyDetectorTransportAction.class); private final Client client; + private final TransportService transportService; private final AnomalyDetectionIndices anomalyDetectionIndices; private final ClusterService clusterService; private final NamedXContentRegistry xContentRegistry; + private final ADTaskManager adTaskManager; private volatile Boolean filterByEnabled; @Inject @@ -66,13 +69,16 @@ public IndexAnomalyDetectorTransportAction( ClusterService clusterService, Settings settings, AnomalyDetectionIndices anomalyDetectionIndices, - NamedXContentRegistry xContentRegistry + NamedXContentRegistry xContentRegistry, + ADTaskManager adTaskManager ) { super(IndexAnomalyDetectorAction.NAME, transportService, actionFilters, IndexAnomalyDetectorRequest::new); this.client = client; + this.transportService = transportService; this.clusterService = clusterService; this.anomalyDetectionIndices = anomalyDetectionIndices; this.xContentRegistry = xContentRegistry; + this.adTaskManager = adTaskManager; filterByEnabled = AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterByEnabled = it); } @@ -143,6 +149,7 @@ protected void adExecute(IndexAnomalyDetectorRequest request, User user, ActionL IndexAnomalyDetectorActionHandler indexAnomalyDetectorActionHandler = new IndexAnomalyDetectorActionHandler( clusterService, client, + transportService, listener, anomalyDetectionIndices, detectorId, @@ -156,7 +163,8 @@ protected void adExecute(IndexAnomalyDetectorRequest request, User user, ActionL maxAnomalyFeatures, method, xContentRegistry, - user + user, + adTaskManager ); try { indexAnomalyDetectorActionHandler.start(); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchADTasksAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchADTasksAction.java new file mode 100644 index 00000000..3eba89b9 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchADTasksAction.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.search.SearchResponse; + +import com.amazon.opendistroforelasticsearch.ad.constant.CommonValue; + +public class SearchADTasksAction extends ActionType { + // External Action which used for public facing RestAPIs. + public static final String NAME = CommonValue.EXTERNAL_ACTION_PREFIX + "tasks/search"; + public static final SearchADTasksAction INSTANCE = new SearchADTasksAction(); + + private SearchADTasksAction() { + super(NAME, SearchResponse::new); + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchADTasksTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchADTasksTransportAction.java new file mode 100644 index 00000000..b18957c8 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchADTasksTransportAction.java @@ -0,0 +1,42 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +import com.amazon.opendistroforelasticsearch.ad.transport.handler.ADSearchHandler; + +public class SearchADTasksTransportAction extends HandledTransportAction { + private ADSearchHandler searchHandler; + + @Inject + public SearchADTasksTransportAction(TransportService transportService, ActionFilters actionFilters, ADSearchHandler searchHandler) { + super(SearchADTasksAction.NAME, transportService, actionFilters, SearchRequest::new); + this.searchHandler = searchHandler; + } + + @Override + protected void doExecute(Task task, SearchRequest request, ActionListener listener) { + searchHandler.search(request, listener); + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorTransportAction.java index 8c32e776..1246ee75 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorTransportAction.java @@ -15,91 +15,32 @@ package com.amazon.opendistroforelasticsearch.ad.transport; -import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES; -import static com.amazon.opendistroforelasticsearch.ad.util.ParseUtils.addUserBackendRolesFilter; -import static com.amazon.opendistroforelasticsearch.ad.util.ParseUtils.getUserContext; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; -import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; -import com.amazon.opendistroforelasticsearch.commons.authuser.User; +import com.amazon.opendistroforelasticsearch.ad.transport.handler.ADSearchHandler; public class SearchAnomalyDetectorTransportAction extends HandledTransportAction { - private final Logger logger = LogManager.getLogger(SearchAnomalyDetectorTransportAction.class); - - private final Client client; - private volatile Boolean filterEnabled; + private ADSearchHandler searchHandler; @Inject public SearchAnomalyDetectorTransportAction( - Settings settings, TransportService transportService, - ClusterService clusterService, ActionFilters actionFilters, - Client client + ADSearchHandler searchHandler ) { super(SearchAnomalyDetectorAction.NAME, transportService, actionFilters, SearchRequest::new); - this.client = client; - filterEnabled = AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES.get(settings); - clusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterEnabled = it); + this.searchHandler = searchHandler; } @Override protected void doExecute(Task task, SearchRequest request, ActionListener listener) { - User user = getUserContext(client); - try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - validateRole(request, user, listener); - } catch (Exception e) { - logger.error(e); - listener.onFailure(e); - } - } - - private void validateRole(SearchRequest request, User user, ActionListener listener) { - if (user == null) { - // Auth Header is empty when 1. Security is disabled. 2. When user is super-admin - // Proceed with search - search(request, listener); - } else if (!filterEnabled) { - // Security is enabled and filter is disabled - // Proceed with search as user is already authenticated to hit this API. - search(request, listener); - } else { - // Security is enabled and filter is enabled - try { - addUserBackendRolesFilter(user, request.source()); - logger.debug("Filtering result by " + user.getBackendRoles()); - search(request, listener); - } catch (Exception e) { - listener.onFailure(e); - } - } - } - - private void search(SearchRequest request, ActionListener listener) { - client.search(request, new ActionListener() { - @Override - public void onResponse(SearchResponse searchResponse) { - listener.onResponse(searchResponse); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + searchHandler.search(request, listener); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultTransportAction.java index 5ac713b6..e625f556 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultTransportAction.java @@ -15,92 +15,33 @@ package com.amazon.opendistroforelasticsearch.ad.transport; -import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES; -import static com.amazon.opendistroforelasticsearch.ad.util.ParseUtils.addUserBackendRolesFilter; -import static com.amazon.opendistroforelasticsearch.ad.util.ParseUtils.getUserContext; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; -import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; -import com.amazon.opendistroforelasticsearch.commons.authuser.User; +import com.amazon.opendistroforelasticsearch.ad.transport.handler.ADSearchHandler; public class SearchAnomalyResultTransportAction extends HandledTransportAction { - private final Logger logger = LogManager.getLogger(SearchAnomalyResultTransportAction.class); - - private final Client client; - private volatile Boolean filterEnabled; + private ADSearchHandler searchHandler; @Inject public SearchAnomalyResultTransportAction( - Settings settings, TransportService transportService, - ClusterService clusterService, ActionFilters actionFilters, - Client client + ADSearchHandler searchHandler ) { super(SearchAnomalyResultAction.NAME, transportService, actionFilters, SearchRequest::new); - this.client = client; - filterEnabled = AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES.get(settings); - clusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterEnabled = it); + this.searchHandler = searchHandler; } @Override protected void doExecute(Task task, SearchRequest request, ActionListener listener) { - User user = getUserContext(client); - try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - validateRole(request, user, listener); - } catch (Exception e) { - logger.error(e); - listener.onFailure(e); - } - } - - private void validateRole(SearchRequest request, User user, ActionListener listener) { - if (user == null) { - // Auth Header is empty when 1. Security is disabled. 2. When user is super-admin - // Proceed with search - search(request, listener); - } else if (!filterEnabled) { - // Security is enabled and filter is disabled - // Proceed with search as user is already authenticated to hit this API. - search(request, listener); - } else { - // Security is enabled and filter is enabled - try { - addUserBackendRolesFilter(user, request.source()); - logger.debug("Filtering result by " + user.getBackendRoles()); - search(request, listener); - } catch (Exception e) { - listener.onFailure(e); - } - } - } - - private void search(SearchRequest request, ActionListener listener) { - client.search(request, new ActionListener() { - @Override - public void onResponse(SearchResponse searchResponse) { - listener.onResponse(searchResponse); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + searchHandler.search(request, listener); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/handler/ADSearchHandler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/handler/ADSearchHandler.java new file mode 100644 index 00000000..8efa09fc --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/handler/ADSearchHandler.java @@ -0,0 +1,84 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport.handler; + +import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES; +import static com.amazon.opendistroforelasticsearch.ad.util.ParseUtils.addUserBackendRolesFilter; +import static com.amazon.opendistroforelasticsearch.ad.util.ParseUtils.getUserContext; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; + +import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; +import com.amazon.opendistroforelasticsearch.commons.authuser.User; + +/** + * Handle general search request, check user role and return search response. + */ +public class ADSearchHandler { + private final Logger logger = LogManager.getLogger(ADSearchHandler.class); + private final Client client; + private volatile Boolean filterEnabled; + + public ADSearchHandler(Settings settings, ClusterService clusterService, Client client) { + this.client = client; + filterEnabled = AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterEnabled = it); + } + + /** + * Validate user role, add backend role filter if filter enabled + * and execute search. + * + * @param request search request + * @param listener action listerner + */ + public void search(SearchRequest request, ActionListener listener) { + User user = getUserContext(client); + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + validateRole(request, user, listener); + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); + } + } + + private void validateRole(SearchRequest request, User user, ActionListener listener) { + if (user == null || !filterEnabled) { + // Case 1: user == null when 1. Security is disabled. 2. When user is super-admin + // Case 2: If Security is enabled and filter is disabled, proceed with search as + // user is already authenticated to hit this API. + client.search(request, listener); + } else { + // Security is enabled and filter is enabled + try { + addUserBackendRolesFilter(user, request.source()); + logger.debug("Filtering result by " + user.getBackendRoles()); + client.search(request, listener); + } catch (Exception e) { + listener.onFailure(e); + } + } + } + +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/ADIntegTestCase.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/ADIntegTestCase.java index 60daab9e..0b518504 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/ADIntegTestCase.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/ADIntegTestCase.java @@ -60,6 +60,7 @@ import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; import com.amazon.opendistroforelasticsearch.ad.model.ADTask; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; @@ -100,6 +101,10 @@ public String createDetector(AnomalyDetector detector) throws IOException { return indexDoc(AnomalyDetector.ANOMALY_DETECTORS_INDEX, detector.toXContent(jsonBuilder(), XCONTENT_WITH_TYPE)); } + public String createADResult(AnomalyResult adResult) throws IOException { + return indexDoc(CommonName.ANOMALY_RESULT_INDEX_ALIAS, adResult.toXContent(jsonBuilder(), XCONTENT_WITH_TYPE)); + } + public String createADTask(ADTask adTask) throws IOException { return indexDoc(CommonName.DETECTION_STATE_INDEX, adTask.toXContent(jsonBuilder(), XCONTENT_WITH_TYPE)); } @@ -108,6 +113,10 @@ public void createDetectorIndex() throws IOException { createIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX, AnomalyDetectionIndices.getAnomalyDetectorMappings()); } + public void createADResultIndex() throws IOException { + createIndex(CommonName.ANOMALY_RESULT_INDEX_ALIAS, AnomalyDetectionIndices.getAnomalyResultMappings()); + } + public void createDetectionStateIndex() throws IOException { createIndex(CommonName.DETECTION_STATE_INDEX, AnomalyDetectionIndices.getDetectionStateMappings()); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/HistoricalDetectorIntegTestCase.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/HistoricalDetectorIntegTestCase.java index 12b9b136..929ff395 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/HistoricalDetectorIntegTestCase.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/HistoricalDetectorIntegTestCase.java @@ -18,11 +18,16 @@ import static com.amazon.opendistroforelasticsearch.ad.model.ADTask.DETECTOR_ID_FIELD; import static com.amazon.opendistroforelasticsearch.ad.model.ADTask.EXECUTION_START_TIME_FIELD; import static com.amazon.opendistroforelasticsearch.ad.model.ADTask.IS_LATEST_FIELD; +import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.START_JOB; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -33,13 +38,16 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.transport.MockTransportService; import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.mock.plugin.MockReindexPlugin; import com.amazon.opendistroforelasticsearch.ad.model.ADTask; import com.amazon.opendistroforelasticsearch.ad.model.ADTaskState; import com.amazon.opendistroforelasticsearch.ad.model.ADTaskType; @@ -47,6 +55,10 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; import com.amazon.opendistroforelasticsearch.ad.model.DetectionDateRange; import com.amazon.opendistroforelasticsearch.ad.model.Feature; +import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobAction; +import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobRequest; +import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyDetectorJobResponse; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; public abstract class HistoricalDetectorIntegTestCase extends ADIntegTestCase { @@ -55,6 +67,15 @@ public abstract class HistoricalDetectorIntegTestCase extends ADIntegTestCase { protected int detectionIntervalInMinutes = 1; protected int DEFAULT_TEST_DATA_DOCS = 3000; + @Override + protected Collection> getMockPlugins() { + final ArrayList> plugins = new ArrayList<>(); + plugins.add(MockReindexPlugin.class); + plugins.addAll(super.getMockPlugins()); + plugins.remove(MockTransportService.TestPlugin.class); + return Collections.unmodifiableList(plugins); + } + public void ingestTestData(String testIndex, Instant startTime, int detectionIntervalInMinutes, String type) { ingestTestData(testIndex, startTime, detectionIntervalInMinutes, type, DEFAULT_TEST_DATA_DOCS); } @@ -180,4 +201,19 @@ public ADTask toADTask(GetResponse doc) throws IOException { public AnomalyDetectorJob toADJob(GetResponse doc) throws IOException { return AnomalyDetectorJob.parse(TestHelpers.parser(doc.getSourceAsString())); } + + public ADTask startHistoricalDetector(Instant startTime, Instant endTime) throws IOException { + DetectionDateRange dateRange = new DetectionDateRange(startTime, endTime); + AnomalyDetector detector = TestHelpers + .randomDetector(dateRange, ImmutableList.of(maxValueFeature()), testIndex, detectionIntervalInMinutes, timeField); + String detectorId = createDetector(detector); + AnomalyDetectorJobRequest request = new AnomalyDetectorJobRequest( + detectorId, + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + START_JOB + ); + AnomalyDetectorJobResponse response = client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); + return getADTask(response.getId()); + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/HistoricalDetectorRestTestCase.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/HistoricalDetectorRestTestCase.java index 8d6cd5f7..66773ca9 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/HistoricalDetectorRestTestCase.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/HistoricalDetectorRestTestCase.java @@ -171,4 +171,36 @@ protected String startHistoricalDetector(String detectorId) throws IOException { return taskId; } + protected ADTaskProfile waitUntilGetTaskProfile(String detectorId) throws InterruptedException { + int i = 0; + ADTaskProfile adTaskProfile = null; + while (adTaskProfile == null && i < 200) { + try { + adTaskProfile = getADTaskProfile(detectorId); + } catch (Exception e) {} finally { + Thread.sleep(100); + } + i++; + } + assertNotNull(adTaskProfile); + return adTaskProfile; + } + + protected ADTaskProfile waitUntilTaskFinished(String detectorId) throws InterruptedException { + int i = 0; + ADTaskProfile adTaskProfile = null; + while ((adTaskProfile == null || TestHelpers.historicalDetectorRunningStats.contains(adTaskProfile.getAdTask().getState())) + && i < 30) { + try { + adTaskProfile = getADTaskProfile(detectorId); + } catch (Exception e) { + e.printStackTrace(); + } finally { + Thread.sleep(1000); + } + i++; + } + assertNotNull(adTaskProfile); + return adTaskProfile; + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java index 2c7d57bd..37a3b34c 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java @@ -17,6 +17,7 @@ import static org.apache.http.entity.ContentType.APPLICATION_JSON; import static org.elasticsearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.test.ESTestCase.buildNewFakeTransportAddress; @@ -61,6 +62,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetadata; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.AdminClient; @@ -97,6 +99,8 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; @@ -947,4 +951,46 @@ public static SearchHits createSearchHits(int totalHits) { public static DiscoveryNode randomDiscoveryNode() { return new DiscoveryNode(UUIDs.randomBase64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); } + + public static SearchRequest matchAllRequest() { + BoolQueryBuilder query = new BoolQueryBuilder().filter(new MatchAllQueryBuilder()); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query); + return new SearchRequest().source(searchSourceBuilder); + } + + public static Map parseStatsResult(String statsResult) throws IOException { + XContentParser parser = TestHelpers.parser(statsResult); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + Map adStats = new HashMap<>(); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + if (fieldName.equals("nodes")) { + Map nodesAdStats = new HashMap<>(); + adStats.put("nodes", nodesAdStats); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String nodeId = parser.currentName(); + Map nodeAdStats = new HashMap<>(); + nodesAdStats.put(nodeId, nodeAdStats); + parser.nextToken(); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String nodeStatName = parser.currentName(); + XContentParser.Token token = parser.nextToken(); + if (nodeStatName.equals("models")) { + parser.skipChildren(); + } else if (nodeStatName.contains("_count")) { + nodeAdStats.put(nodeStatName, parser.longValue()); + } else { + nodeAdStats.put(nodeStatName, parser.text()); + } + } + } + } else if (fieldName.contains("_count")) { + adStats.put(fieldName, parser.longValue()); + } else { + adStats.put(fieldName, parser.text()); + } + } + return adStats; + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetectorTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetectorTests.java index 1ffffd98..83890b6e 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetectorTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetectorTests.java @@ -497,4 +497,30 @@ public void testNullFeatureAttributes() throws IOException { assertNotNull(anomalyDetector.getFeatureAttributes()); assertEquals(0, anomalyDetector.getFeatureAttributes().size()); } + + public void testHistoricalHCDetector() { + expectThrows( + IllegalArgumentException.class, + () -> new AnomalyDetector( + randomAlphaOfLength(5), + randomLong(), + randomAlphaOfLength(5), + randomAlphaOfLength(5), + randomAlphaOfLength(5), + ImmutableList.of(randomAlphaOfLength(5)), + null, + TestHelpers.randomQuery(), + TestHelpers.randomIntervalTimeConfiguration(), + TestHelpers.randomIntervalTimeConfiguration(), + null, + null, + 1, + Instant.now(), + ImmutableList.of(randomAlphaOfLength(5)), + TestHelpers.randomUser(), + null, + TestHelpers.randomDetectionDateRange() + ) + ); + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/rest/HistoricalDetectorRestApiIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/rest/HistoricalDetectorRestApiIT.java index 3ad07ce7..7a45646a 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/rest/HistoricalDetectorRestApiIT.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/rest/HistoricalDetectorRestApiIT.java @@ -17,11 +17,17 @@ import static com.amazon.opendistroforelasticsearch.ad.TestHelpers.AD_BASE_STATS_URI; import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS; +import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE; + +import java.io.IOException; +import java.util.Map; import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.rest.RestStatus; +import org.junit.Before; import com.amazon.opendistroforelasticsearch.ad.HistoricalDetectorRestTestCase; import com.amazon.opendistroforelasticsearch.ad.TestHelpers; @@ -34,6 +40,15 @@ public class HistoricalDetectorRestApiIT extends HistoricalDetectorRestTestCase { + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + updateClusterSettings(BATCH_TASK_PIECE_INTERVAL_SECONDS.getKey(), 5); + updateClusterSettings(MAX_BATCH_TASK_PER_NODE.getKey(), 10); + } + + @SuppressWarnings("unchecked") public void testHistoricalDetectorWorkflow() throws Exception { // create historical detector AnomalyDetector detector = createHistoricalDetector(); @@ -42,17 +57,26 @@ public void testHistoricalDetectorWorkflow() throws Exception { // start historical detector String taskId = startHistoricalDetector(detectorId); - // get task stats - Response statsResponse = TestHelpers.makeRequest(client(), "GET", AD_BASE_STATS_URI, ImmutableMap.of(), "", null); - String statsResult = EntityUtils.toString(statsResponse.getEntity()); - assertTrue(statsResult.contains("\"ad_executing_batch_task_count\":1")); - // get task profile - ADTaskProfile adTaskProfile = getADTaskProfile(detectorId); + ADTaskProfile adTaskProfile = waitUntilGetTaskProfile(detectorId); + ADTask adTask = adTaskProfile.getAdTask(); assertEquals(taskId, adTask.getTaskId()); assertTrue(TestHelpers.historicalDetectorRunningStats.contains(adTask.getState())); + // get task stats + Response statsResponse = TestHelpers.makeRequest(client(), "GET", AD_BASE_STATS_URI, ImmutableMap.of(), "", null); + String statsResult = EntityUtils.toString(statsResponse.getEntity()); + Map stringObjectMap = TestHelpers.parseStatsResult(statsResult); + assertTrue((long) stringObjectMap.get("historical_single_entity_detector_count") > 0); + Map nodes = (Map) stringObjectMap.get("nodes"); + long totalBatchTaskExecution = 0; + for (String key : nodes.keySet()) { + Map nodeStats = (Map) nodes.get(key); + totalBatchTaskExecution += (long) nodeStats.get("ad_total_batch_task_execution_count"); + } + assertTrue(totalBatchTaskExecution > 0); + // get historical detector with AD task ToXContentObject[] result = getHistoricalAnomalyDetector(detectorId, true, client()); AnomalyDetector parsedDetector = (AnomalyDetector) result[0]; @@ -63,22 +87,174 @@ public void testHistoricalDetectorWorkflow() throws Exception { assertNotNull(parsedADTask); assertEquals(taskId, parsedADTask.getTaskId()); + // get task profile + ADTaskProfile endTaskProfile = waitUntilTaskFinished(detectorId); + ADTask stoppedAdTask = endTaskProfile.getAdTask(); + assertEquals(taskId, stoppedAdTask.getTaskId()); + assertEquals(ADTaskState.FINISHED.name(), stoppedAdTask.getState()); + } + + @SuppressWarnings("unchecked") + public void testStopHistoricalDetector() throws Exception { + // create historical detector + AnomalyDetector detector = createHistoricalDetector(); + String detectorId = detector.getDetectorId(); + + // start historical detector + String taskId = startHistoricalDetector(detectorId); + + waitUntilGetTaskProfile(detectorId); + // stop historical detector Response stopDetectorResponse = stopAnomalyDetector(detectorId, client()); assertEquals(RestStatus.OK, restStatus(stopDetectorResponse)); // get task profile - ADTaskProfile stoppedAdTaskProfile = getADTaskProfile(detectorId); - int i = 0; - while (TestHelpers.historicalDetectorRunningStats.contains(stoppedAdTaskProfile.getAdTask().getState()) && i < 10) { - stoppedAdTaskProfile = getADTaskProfile(detectorId); - Thread.sleep(2000); - i++; - } + ADTaskProfile stoppedAdTaskProfile = waitUntilTaskFinished(detectorId); ADTask stoppedAdTask = stoppedAdTaskProfile.getAdTask(); assertEquals(taskId, stoppedAdTask.getTaskId()); assertEquals(ADTaskState.STOPPED.name(), stoppedAdTask.getState()); updateClusterSettings(BATCH_TASK_PIECE_INTERVAL_SECONDS.getKey(), 1); + + // get AD stats + Response statsResponse = TestHelpers.makeRequest(client(), "GET", AD_BASE_STATS_URI, ImmutableMap.of(), "", null); + String statsResult = EntityUtils.toString(statsResponse.getEntity()); + Map stringObjectMap = TestHelpers.parseStatsResult(statsResult); + assertTrue((long) stringObjectMap.get("historical_single_entity_detector_count") > 0); + Map nodes = (Map) stringObjectMap.get("nodes"); + long cancelledTaskCount = 0; + for (String key : nodes.keySet()) { + Map nodeStats = (Map) nodes.get(key); + cancelledTaskCount += (long) nodeStats.get("ad_canceled_batch_task_count"); + } + assertTrue(cancelledTaskCount == 1); + } + + public void testUpdateHistoricalDetector() throws IOException { + // create historical detector + AnomalyDetector detector = createHistoricalDetector(); + String detectorId = detector.getDetectorId(); + + // update historical detector + AnomalyDetector newDetector = randomAnomalyDetector(detector); + Response updateResponse = TestHelpers + .makeRequest( + client(), + "PUT", + TestHelpers.AD_BASE_DETECTORS_URI + "/" + detectorId + "?refresh=true", + ImmutableMap.of(), + toHttpEntity(newDetector), + null + ); + Map responseBody = entityAsMap(updateResponse); + assertEquals(detector.getDetectorId(), responseBody.get("_id")); + assertEquals((detector.getVersion().intValue() + 1), (int) responseBody.get("_version")); + + // get historical detector + AnomalyDetector updatedDetector = getAnomalyDetector(detector.getDetectorId(), client()); + assertNotEquals(updatedDetector.getLastUpdateTime(), detector.getLastUpdateTime()); + assertEquals(newDetector.getName(), updatedDetector.getName()); + assertEquals(newDetector.getDescription(), updatedDetector.getDescription()); + } + + public void testUpdateRunningHistoricalDetector() throws Exception { + // create historical detector + AnomalyDetector detector = createHistoricalDetector(); + String detectorId = detector.getDetectorId(); + + // start historical detector + startHistoricalDetector(detectorId); + + // update historical detector + AnomalyDetector newDetector = randomAnomalyDetector(detector); + TestHelpers + .assertFailWith( + ResponseException.class, + "Detector is running", + () -> TestHelpers + .makeRequest( + client(), + "PUT", + TestHelpers.AD_BASE_DETECTORS_URI + "/" + detectorId + "?refresh=true", + ImmutableMap.of(), + toHttpEntity(newDetector), + null + ) + ); + + waitUntilTaskFinished(detectorId); + } + + public void testDeleteHistoricalDetector() throws IOException { + // create historical detector + AnomalyDetector detector = createHistoricalDetector(); + String detectorId = detector.getDetectorId(); + + // delete detector + Response response = TestHelpers + .makeRequest(client(), "DELETE", TestHelpers.AD_BASE_DETECTORS_URI + "/" + detectorId, ImmutableMap.of(), "", null); + assertEquals(RestStatus.OK, restStatus(response)); + } + + public void testDeleteRunningHistoricalDetector() throws Exception { + // create historical detector + AnomalyDetector detector = createHistoricalDetector(); + String detectorId = detector.getDetectorId(); + + // start historical detector + startHistoricalDetector(detectorId); + + // delete detector + TestHelpers + .assertFailWith( + ResponseException.class, + "Detector is running", + () -> TestHelpers + .makeRequest(client(), "DELETE", TestHelpers.AD_BASE_DETECTORS_URI + "/" + detectorId, ImmutableMap.of(), "", null) + ); + + waitUntilTaskFinished(detectorId); + } + + public void testSearchTasks() throws IOException, InterruptedException { + // create historical detector + AnomalyDetector detector = createHistoricalDetector(); + String detectorId = detector.getDetectorId(); + + // start historical detector + String taskId = startHistoricalDetector(detectorId); + + waitUntilTaskFinished(detectorId); + + String query = String.format("{\"query\":{\"term\":{\"detector_id\":{\"value\":\"%s\"}}}}", detectorId); + Response response = TestHelpers + .makeRequest(client(), "POST", TestHelpers.AD_BASE_DETECTORS_URI + "/tasks/_search", ImmutableMap.of(), query, null); + String searchResult = EntityUtils.toString(response.getEntity()); + assertTrue(searchResult.contains(taskId)); + assertTrue(searchResult.contains(detector.getDetectorId())); + } + + private AnomalyDetector randomAnomalyDetector(AnomalyDetector detector) { + return new AnomalyDetector( + detector.getDetectorId(), + null, + randomAlphaOfLength(5), + randomAlphaOfLength(5), + detector.getTimeField(), + detector.getIndices(), + detector.getFeatureAttributes(), + detector.getFilterQuery(), + detector.getDetectionInterval(), + detector.getWindowDelay(), + detector.getShingleSize(), + detector.getUiMetadata(), + detector.getSchemaVersion(), + detector.getLastUpdateTime(), + detector.getCategoryField(), + detector.getUser(), + detector.getDetectorType(), + detector.getDetectionDateRange() + ); } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADBatchAnomalyResultTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADBatchAnomalyResultTransportActionTests.java index 9dc252d7..9322c040 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADBatchAnomalyResultTransportActionTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADBatchAnomalyResultTransportActionTests.java @@ -140,6 +140,7 @@ public void testHistoricalDetectorWithNonExistingIndex() throws IOException { public void testHistoricalDetectorExceedsMaxRunningTaskLimit() throws IOException, InterruptedException { updateTransientSettings(ImmutableMap.of(MAX_BATCH_TASK_PER_NODE.getKey(), 1)); + updateTransientSettings(ImmutableMap.of(BATCH_TASK_PIECE_INTERVAL_SECONDS.getKey(), 5)); DetectionDateRange dateRange = new DetectionDateRange(startTime, endTime); for (int i = 0; i < getDataNodes().size(); i++) { client().execute(ADBatchAnomalyResultAction.INSTANCE, adBatchAnomalyResultRequest(dateRange)); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportActionTests.java index 304a0f0d..319235a8 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportActionTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportActionTests.java @@ -28,9 +28,6 @@ import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -44,16 +41,13 @@ import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.transport.MockTransportService; import org.junit.After; import org.junit.Before; import com.amazon.opendistroforelasticsearch.ad.HistoricalDetectorIntegTestCase; import com.amazon.opendistroforelasticsearch.ad.TestHelpers; import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; -import com.amazon.opendistroforelasticsearch.ad.mock.plugin.MockReindexPlugin; import com.amazon.opendistroforelasticsearch.ad.mock.transport.MockAnomalyDetectorJobAction; import com.amazon.opendistroforelasticsearch.ad.model.ADTask; import com.amazon.opendistroforelasticsearch.ad.model.ADTaskProfile; @@ -93,15 +87,6 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } - @Override - protected Collection> getMockPlugins() { - final ArrayList> plugins = new ArrayList<>(); - plugins.add(MockReindexPlugin.class); - plugins.addAll(super.getMockPlugins()); - plugins.remove(MockTransportService.TestPlugin.class); - return Collections.unmodifiableList(plugins); - } - public void testDetectorIndexNotFound() { deleteDetectorIndex(); String detectorId = randomAlphaOfLength(5); @@ -124,27 +109,12 @@ public void testDetectorNotFound() { } public void testValidHistoricalDetector() throws IOException, InterruptedException { - ADTask adTask = startHistoricalDetector(); + ADTask adTask = startHistoricalDetector(startTime, endTime); Thread.sleep(10000); ADTask finishedTask = getADTask(adTask.getTaskId()); assertEquals(ADTaskState.FINISHED.name(), finishedTask.getState()); } - private ADTask startHistoricalDetector() throws IOException { - DetectionDateRange dateRange = new DetectionDateRange(startTime, endTime); - AnomalyDetector detector = TestHelpers - .randomDetector(dateRange, ImmutableList.of(maxValueFeature()), testIndex, detectionIntervalInMinutes, timeField); - String detectorId = createDetector(detector); - AnomalyDetectorJobRequest request = new AnomalyDetectorJobRequest( - detectorId, - UNASSIGNED_SEQ_NO, - UNASSIGNED_PRIMARY_TERM, - START_JOB - ); - AnomalyDetectorJobResponse response = client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); - return getADTask(response.getId()); - } - public void testStartHistoricalDetectorWithUser() throws IOException, InterruptedException { DetectionDateRange dateRange = new DetectionDateRange(startTime, endTime); AnomalyDetector detector = TestHelpers @@ -327,7 +297,7 @@ public void testStopRealtimeDetector() throws IOException { } public void testStopHistoricalDetector() throws IOException, InterruptedException { - ADTask adTask = startHistoricalDetector(); + ADTask adTask = startHistoricalDetector(startTime, endTime); assertEquals(ADTaskState.INIT.name(), adTask.getState()); AnomalyDetectorJobRequest request = stopDetectorJobRequest(adTask.getDetectorId()); client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); @@ -338,7 +308,7 @@ public void testStopHistoricalDetector() throws IOException, InterruptedExceptio } public void testProfileHistoricalDetector() throws IOException, InterruptedException { - ADTask adTask = startHistoricalDetector(); + ADTask adTask = startHistoricalDetector(startTime, endTime); GetAnomalyDetectorRequest request = taskProfileRequest(adTask.getDetectorId()); GetAnomalyDetectorResponse response = client().execute(GetAnomalyDetectorAction.INSTANCE, request).actionGet(10000); assertNotNull(response.getDetectorProfile().getAdTaskProfile()); @@ -362,8 +332,8 @@ public void testProfileHistoricalDetector() throws IOException, InterruptedExcep } public void testProfileWithMultipleRunningTask() throws IOException { - ADTask adTask1 = startHistoricalDetector(); - ADTask adTask2 = startHistoricalDetector(); + ADTask adTask1 = startHistoricalDetector(startTime, endTime); + ADTask adTask2 = startHistoricalDetector(startTime, endTime); GetAnomalyDetectorRequest request1 = taskProfileRequest(adTask1.getDetectorId()); GetAnomalyDetectorRequest request2 = taskProfileRequest(adTask2.getDetectorId()); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/DeleteAnomalyDetectorActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/DeleteAnomalyDetectorActionTests.java index 6e7a1e61..df7a6d97 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/DeleteAnomalyDetectorActionTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/DeleteAnomalyDetectorActionTests.java @@ -40,10 +40,12 @@ import org.junit.Test; import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; +import com.amazon.opendistroforelasticsearch.ad.task.ADTaskManager; public class DeleteAnomalyDetectorActionTests extends ESIntegTestCase { private DeleteAnomalyDetectorTransportAction action; private ActionListener response; + private ADTaskManager adTaskManager; @Override @Before @@ -55,13 +57,15 @@ public void setUp() throws Exception { Collections.unmodifiableSet(new HashSet<>(Arrays.asList(AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES))) ); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + adTaskManager = mock(ADTaskManager.class); action = new DeleteAnomalyDetectorTransportAction( mock(TransportService.class), mock(ActionFilters.class), client(), clusterService, Settings.EMPTY, - xContentRegistry() + xContentRegistry(), + adTaskManager ); response = new ActionListener() { @Override diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/IndexAnomalyDetectorTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/IndexAnomalyDetectorTransportActionTests.java index 18f01e73..e3d88280 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/IndexAnomalyDetectorTransportActionTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/IndexAnomalyDetectorTransportActionTests.java @@ -43,6 +43,7 @@ import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; +import com.amazon.opendistroforelasticsearch.ad.task.ADTaskManager; import com.amazon.opendistroforelasticsearch.commons.ConfigConstants; public class IndexAnomalyDetectorTransportActionTests extends ESIntegTestCase { @@ -52,6 +53,7 @@ public class IndexAnomalyDetectorTransportActionTests extends ESIntegTestCase { private ActionListener response; private ClusterService clusterService; private ClusterSettings clusterSettings; + private ADTaskManager adTaskManager; @Override @Before @@ -64,6 +66,7 @@ public void setUp() throws Exception { ); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + adTaskManager = mock(ADTaskManager.class); action = new IndexAnomalyDetectorTransportAction( mock(TransportService.class), mock(ActionFilters.class), @@ -71,7 +74,8 @@ public void setUp() throws Exception { clusterService, indexSettings(), mock(AnomalyDetectionIndices.class), - xContentRegistry() + xContentRegistry(), + adTaskManager ); task = mock(Task.class); request = new IndexAnomalyDetectorRequest( @@ -123,7 +127,8 @@ public void testIndexTransportActionWithUserAndFilterOn() { clusterService, settings, mock(AnomalyDetectionIndices.class), - xContentRegistry() + xContentRegistry(), + adTaskManager ); transportAction.doExecute(task, request, response); } @@ -146,7 +151,8 @@ public void testIndexTransportActionWithUserAndFilterOff() { clusterService, settings, mock(AnomalyDetectionIndices.class), - xContentRegistry() + xContentRegistry(), + adTaskManager ); transportAction.doExecute(task, request, response); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchADTasksActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchADTasksActionTests.java new file mode 100644 index 00000000..52f97574 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchADTasksActionTests.java @@ -0,0 +1,48 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import static com.amazon.opendistroforelasticsearch.ad.TestHelpers.matchAllRequest; + +import java.io.IOException; + +import org.elasticsearch.action.search.SearchResponse; +import org.junit.Test; + +import com.amazon.opendistroforelasticsearch.ad.HistoricalDetectorIntegTestCase; +import com.amazon.opendistroforelasticsearch.ad.TestHelpers; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; + +public class SearchADTasksActionTests extends HistoricalDetectorIntegTestCase { + + @Test + public void testSearchADTasksAction() throws IOException { + createDetectionStateIndex(); + String adTaskId = createADTask(TestHelpers.randomAdTask()); + + SearchResponse searchResponse = client().execute(SearchADTasksAction.INSTANCE, matchAllRequest()).actionGet(10000); + assertEquals(1, searchResponse.getInternalResponse().hits().getTotalHits().value); + assertEquals(adTaskId, searchResponse.getInternalResponse().hits().getAt(0).getId()); + } + + @Test + public void testNoIndex() { + deleteIndexIfExists(CommonName.DETECTION_STATE_INDEX); + SearchResponse searchResponse = client().execute(SearchADTasksAction.INSTANCE, matchAllRequest()).actionGet(10000); + assertEquals(0, searchResponse.getInternalResponse().hits().getTotalHits().value); + } + +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchADTasksTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchADTasksTransportActionTests.java new file mode 100644 index 00000000..c21d8c7a --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchADTasksTransportActionTests.java @@ -0,0 +1,93 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS; +import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.Before; + +import com.amazon.opendistroforelasticsearch.ad.HistoricalDetectorIntegTestCase; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.model.ADTask; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2) +public class SearchADTasksTransportActionTests extends HistoricalDetectorIntegTestCase { + + private Instant startTime; + private Instant endTime; + private String type = "error"; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + startTime = Instant.now().minus(10, ChronoUnit.DAYS); + endTime = Instant.now(); + ingestTestData(testIndex, startTime, detectionIntervalInMinutes, type, 2000); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings + .builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(BATCH_TASK_PIECE_INTERVAL_SECONDS.getKey(), 1) + .put(MAX_BATCH_TASK_PER_NODE.getKey(), 1) + .build(); + } + + public void testSearchWithoutTaskIndex() { + SearchRequest request = searchRequest(false); + expectThrows(IndexNotFoundException.class, () -> client().execute(SearchADTasksAction.INSTANCE, request).actionGet(10000)); + } + + public void testSearchWithNoTasks() throws IOException { + createDetectionStateIndex(); + SearchRequest request = searchRequest(false); + SearchResponse response = client().execute(SearchADTasksAction.INSTANCE, request).actionGet(10000); + assertEquals(0, response.getHits().getTotalHits().value); + } + + public void testSearchWithExistingTask() throws IOException { + startHistoricalDetector(startTime, endTime); + SearchRequest searchRequest = searchRequest(true); + SearchResponse response = client().execute(SearchADTasksAction.INSTANCE, searchRequest).actionGet(10000); + assertEquals(1, response.getHits().getTotalHits().value); + } + + private SearchRequest searchRequest(boolean isLatest) { + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + BoolQueryBuilder query = new BoolQueryBuilder(); + query.filter(new TermQueryBuilder(ADTask.IS_LATEST_FIELD, isLatest)); + sourceBuilder.query(query); + SearchRequest request = new SearchRequest().source(sourceBuilder).indices(CommonName.DETECTION_STATE_INDEX); + return request; + } + +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorActionTests.java index 18948499..8eefcc10 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorActionTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorActionTests.java @@ -15,102 +15,65 @@ package com.amazon.opendistroforelasticsearch.ad.transport; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.DETECTOR_TYPE_FIELD; import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; +import java.time.Instant; +import java.time.temporal.ChronoUnit; -import org.apache.lucene.index.IndexNotFoundException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; -import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; +import com.amazon.opendistroforelasticsearch.ad.HistoricalDetectorIntegTestCase; +import com.amazon.opendistroforelasticsearch.ad.TestHelpers; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorType; +import com.google.common.collect.ImmutableList; -public class SearchAnomalyDetectorActionTests extends ESIntegTestCase { - private SearchAnomalyDetectorTransportAction action; - private Task task; - private ActionListener response; - private ClusterService clusterService; - private Client client; +public class SearchAnomalyDetectorActionTests extends HistoricalDetectorIntegTestCase { - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - clusterService = mock(ClusterService.class); - ClusterSettings clusterSettings = new ClusterSettings( - Settings.EMPTY, - Collections.unmodifiableSet(new HashSet<>(Arrays.asList(AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES))) - ); - when(clusterService.getClusterSettings()).thenReturn(clusterSettings); - ThreadPool threadPool = mock(ThreadPool.class); - client = mock(Client.class); - ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - when(client.threadPool()).thenReturn(threadPool); - when(client.threadPool().getThreadContext()).thenReturn(threadContext); + private String indexName = "test-data"; + private Instant startTime = Instant.now().minus(2, ChronoUnit.DAYS); - action = new SearchAnomalyDetectorTransportAction( - Settings.EMPTY, - mock(TransportService.class), - clusterService, - mock(ActionFilters.class), - client - ); - task = mock(Task.class); - response = new ActionListener() { - @Override - public void onResponse(SearchResponse searchResponse) { - Assert.assertEquals(searchResponse.getSuccessfulShards(), 5); - } + public void testSearchDetectorAction() throws IOException { + ingestTestData(indexName, startTime, 1, "test", 3000); + String detectorType = AnomalyDetectorType.REALTIME_SINGLE_ENTITY.name(); + AnomalyDetector detector = TestHelpers + .randomAnomalyDetector( + ImmutableList.of(indexName), + ImmutableList.of(TestHelpers.randomFeature(true)), + null, + Instant.now(), + detectorType, + 1, + null, + false + ); + createDetectorIndex(); + String detectorId = createDetector(detector); - @Override - public void onFailure(Exception e) { - Assert.assertFalse(IndexNotFoundException.class == e.getClass()); - } - }; - } + BoolQueryBuilder query = new BoolQueryBuilder().filter(new TermQueryBuilder(DETECTOR_TYPE_FIELD, detectorType)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest().source(searchSourceBuilder); - // Ignoring this test as this is flaky. - @Ignore - @Test - public void testSearchResponse() throws IOException { - // Will call response.onResponse as Index exists - Settings indexSettings = Settings.builder().put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build(); - CreateIndexRequest indexRequest = new CreateIndexRequest("my-test-index", indexSettings); - client().admin().indices().create(indexRequest).actionGet(); - SearchRequest searchRequest = new SearchRequest("my-test-index"); - action.doExecute(task, searchRequest, response); + SearchResponse searchResponse = client().execute(SearchAnomalyDetectorAction.INSTANCE, request).actionGet(10000); + assertEquals(1, searchResponse.getInternalResponse().hits().getTotalHits().value); + assertEquals(detectorId, searchResponse.getInternalResponse().hits().getAt(0).getId()); } - @Test - public void testSearchDetectorAction() { - Assert.assertNotNull(SearchAnomalyDetectorAction.INSTANCE.name()); - Assert.assertEquals(SearchAnomalyDetectorAction.INSTANCE.name(), SearchAnomalyDetectorAction.NAME); - } + public void testNoIndex() { + deleteIndexIfExists(AnomalyDetector.ANOMALY_DETECTORS_INDEX); - @Test - public void testNoIndex() throws IOException { - // No Index, will call response.onFailure - SearchRequest searchRequest = new SearchRequest("my-test-index"); - action.doExecute(task, searchRequest, response); + BoolQueryBuilder query = new BoolQueryBuilder().filter(new MatchAllQueryBuilder()); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest().source(searchSourceBuilder); + + SearchResponse searchResponse = client().execute(SearchAnomalyDetectorAction.INSTANCE, request).actionGet(10000); + assertEquals(0, searchResponse.getInternalResponse().hits().getTotalHits().value); } + } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultActionTests.java index c107704c..83994ea4 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultActionTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyResultActionTests.java @@ -15,101 +15,35 @@ package com.amazon.opendistroforelasticsearch.ad.transport; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static com.amazon.opendistroforelasticsearch.ad.TestHelpers.matchAllRequest; import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import org.apache.lucene.index.IndexNotFoundException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; -import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; +import com.amazon.opendistroforelasticsearch.ad.HistoricalDetectorIntegTestCase; +import com.amazon.opendistroforelasticsearch.ad.TestHelpers; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; -public class SearchAnomalyResultActionTests extends ESIntegTestCase { - private SearchAnomalyResultTransportAction action; - private Task task; - private ActionListener response; - private ClusterService clusterService; - private Client client; +public class SearchAnomalyResultActionTests extends HistoricalDetectorIntegTestCase { - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - clusterService = mock(ClusterService.class); - ClusterSettings clusterSettings = new ClusterSettings( - Settings.EMPTY, - Collections.unmodifiableSet(new HashSet<>(Arrays.asList(AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES))) - ); - when(clusterService.getClusterSettings()).thenReturn(clusterSettings); - ThreadPool threadPool = mock(ThreadPool.class); - client = mock(Client.class); - ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - when(client.threadPool()).thenReturn(threadPool); - when(client.threadPool().getThreadContext()).thenReturn(threadContext); - action = new SearchAnomalyResultTransportAction( - Settings.EMPTY, - mock(TransportService.class), - clusterService, - mock(ActionFilters.class), - client - ); - task = mock(Task.class); - response = new ActionListener() { - @Override - public void onResponse(SearchResponse searchResponse) { - Assert.assertEquals(searchResponse.getSuccessfulShards(), 5); - } + @Test + public void testSearchResultAction() throws IOException { + createADResultIndex(); + String adResultId = createADResult(TestHelpers.randomAnomalyDetectResult()); - @Override - public void onFailure(Exception e) { - Assert.assertFalse(IndexNotFoundException.class == e.getClass()); - } - }; - } + SearchResponse searchResponse = client().execute(SearchAnomalyResultAction.INSTANCE, matchAllRequest()).actionGet(10000); + assertEquals(1, searchResponse.getInternalResponse().hits().getTotalHits().value); - // Ignoring this test as this is flaky. - @Ignore - @Test - public void testSearchResponse() throws IOException { - // Will call response.onResponse as Index exists - Settings indexSettings = Settings.builder().put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build(); - CreateIndexRequest indexRequest = new CreateIndexRequest("my-test-index", indexSettings); - client().admin().indices().create(indexRequest).actionGet(); - SearchRequest searchRequest = new SearchRequest("my-test-index"); - action.doExecute(task, searchRequest, response); + assertEquals(adResultId, searchResponse.getInternalResponse().hits().getAt(0).getId()); } @Test - public void testSearchResultAction() { - Assert.assertNotNull(SearchAnomalyResultAction.INSTANCE.name()); - Assert.assertEquals(SearchAnomalyResultAction.INSTANCE.name(), SearchAnomalyResultAction.NAME); + public void testNoIndex() { + deleteIndexIfExists(CommonName.ANOMALY_RESULT_INDEX_ALIAS); + SearchResponse searchResponse = client().execute(SearchAnomalyResultAction.INSTANCE, matchAllRequest()).actionGet(10000); + assertEquals(0, searchResponse.getInternalResponse().hits().getTotalHits().value); } - @Test - public void testNoIndex() throws IOException { - // No Index, will call response.onFailure - SearchRequest searchRequest = new SearchRequest("my-test-index"); - action.doExecute(task, searchRequest, response); - } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/handler/ADSearchHandlerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/handler/ADSearchHandlerTests.java new file mode 100644 index 00000000..50a98537 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/handler/ADSearchHandlerTests.java @@ -0,0 +1,98 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport.handler; + +import static com.amazon.opendistroforelasticsearch.ad.TestHelpers.matchAllRequest; +import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Before; + +import com.amazon.opendistroforelasticsearch.ad.ADUnitTestCase; +import com.amazon.opendistroforelasticsearch.commons.ConfigConstants; + +public class ADSearchHandlerTests extends ADUnitTestCase { + + private Client client; + private Settings settings; + private ClusterService clusterService; + private ADSearchHandler searchHandler; + private ClusterSettings clusterSettings; + + private SearchRequest request; + + private ActionListener listener; + + @SuppressWarnings("unchecked") + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + settings = Settings.builder().put(FILTER_BY_BACKEND_ROLES.getKey(), false).build(); + clusterSettings = clusterSetting(settings, FILTER_BY_BACKEND_ROLES); + clusterService = new ClusterService(settings, clusterSettings, null); + client = mock(Client.class); + searchHandler = new ADSearchHandler(settings, clusterService, client); + + ThreadContext threadContext = new ThreadContext(settings); + threadContext.putTransient(ConfigConstants.OPENDISTRO_SECURITY_USER_INFO_THREAD_CONTEXT, "alice|odfe,aes|engineering,operations"); + org.elasticsearch.threadpool.ThreadPool mockThreadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(mockThreadPool); + when(client.threadPool().getThreadContext()).thenReturn(threadContext); + when(mockThreadPool.getThreadContext()).thenReturn(threadContext); + + request = mock(SearchRequest.class); + listener = mock(ActionListener.class); + } + + public void testSearchException() { + doThrow(new RuntimeException("test")).when(client).search(any(), any()); + searchHandler.search(request, listener); + verify(listener, times(1)).onFailure(any()); + } + + public void testFilterEnabledWithWrongSearch() { + settings = Settings.builder().put(FILTER_BY_BACKEND_ROLES.getKey(), true).build(); + clusterService = new ClusterService(settings, clusterSettings, null); + + searchHandler = new ADSearchHandler(settings, clusterService, client); + searchHandler.search(request, listener); + verify(listener, times(1)).onFailure(any()); + } + + public void testFilterEnabled() { + settings = Settings.builder().put(FILTER_BY_BACKEND_ROLES.getKey(), true).build(); + clusterService = new ClusterService(settings, clusterSettings, null); + + searchHandler = new ADSearchHandler(settings, clusterService, client); + searchHandler.search(matchAllRequest(), listener); + verify(client, times(1)).search(any(), any()); + } +} diff --git a/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java b/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java index f65f218b..84c80302 100644 --- a/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java +++ b/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java @@ -51,6 +51,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -62,6 +63,7 @@ import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.rest.handler.IndexAnomalyDetectorActionHandler; +import com.amazon.opendistroforelasticsearch.ad.task.ADTaskManager; import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorResponse; /** @@ -77,6 +79,7 @@ public class IndexAnomalyDetectorActionHandlerTests extends AbstractADTest { private IndexAnomalyDetectorActionHandler handler; private ClusterService clusterService; private NodeClient clientMock; + private TransportService transportService; private ActionListener channel; private AnomalyDetectionIndices anomalyDetectionIndices; private String detectorId; @@ -90,6 +93,7 @@ public class IndexAnomalyDetectorActionHandlerTests extends AbstractADTest { private Integer maxAnomalyFeatures; private Settings settings; private RestRequest.Method method; + private ADTaskManager adTaskManager; /** * Mockito does not allow mock final methods. Make my own delegates and mock them. @@ -131,6 +135,7 @@ public void setUp() throws Exception { settings = Settings.EMPTY; clusterService = mock(ClusterService.class); clientMock = spy(new NodeClient(settings, null)); + transportService = mock(TransportService.class); channel = mock(ActionListener.class); @@ -156,9 +161,12 @@ public void setUp() throws Exception { method = RestRequest.Method.POST; + adTaskManager = mock(ADTaskManager.class); + handler = new IndexAnomalyDetectorActionHandler( clusterService, clientMock, + transportService, channel, anomalyDetectionIndices, detectorId, @@ -172,7 +180,8 @@ public void setUp() throws Exception { maxAnomalyFeatures, method, xContentRegistry(), - null + null, + adTaskManager ); } @@ -204,6 +213,7 @@ public void testNoCategoricalField() throws IOException { handler = new IndexAnomalyDetectorActionHandler( clusterService, clientMock, + transportService, channel, anomalyDetectionIndices, detectorId, @@ -218,7 +228,8 @@ public void testNoCategoricalField() throws IOException { maxAnomalyFeatures, method, xContentRegistry(), - null + null, + adTaskManager ); handler.start(); @@ -267,6 +278,7 @@ public void doE handler = new IndexAnomalyDetectorActionHandler( clusterService, client, + transportService, channel, anomalyDetectionIndices, detectorId, @@ -280,7 +292,8 @@ public void doE maxAnomalyFeatures, method, xContentRegistry(), - null + null, + adTaskManager ); ArgumentCaptor response = ArgumentCaptor.forClass(Exception.class); @@ -342,6 +355,7 @@ public void doE handler = new IndexAnomalyDetectorActionHandler( clusterService, clientSpy, + transportService, channel, anomalyDetectionIndices, detectorId, @@ -355,7 +369,8 @@ public void doE maxAnomalyFeatures, method, xContentRegistry(), - null + null, + adTaskManager ); ArgumentCaptor response = ArgumentCaptor.forClass(Exception.class); @@ -434,6 +449,7 @@ public void doE handler = new IndexAnomalyDetectorActionHandler( clusterService, clientSpy, + transportService, channel, anomalyDetectionIndices, detectorId, @@ -447,7 +463,8 @@ public void doE maxAnomalyFeatures, RestRequest.Method.PUT, xContentRegistry(), - null + null, + adTaskManager ); ArgumentCaptor response = ArgumentCaptor.forClass(Exception.class); @@ -554,6 +571,7 @@ public void testTenMultiEntityDetectorsUpdateSingleEntityAdToMulti() throws IOEx handler = new IndexAnomalyDetectorActionHandler( clusterService, clientMock, + transportService, channel, anomalyDetectionIndices, detectorId, @@ -567,7 +585,8 @@ public void testTenMultiEntityDetectorsUpdateSingleEntityAdToMulti() throws IOEx maxAnomalyFeatures, RestRequest.Method.PUT, xContentRegistry(), - null + null, + adTaskManager ); handler.start(); @@ -626,6 +645,7 @@ public void testTenMultiEntityDetectorsUpdateExistingMultiEntityAd() throws IOEx handler = new IndexAnomalyDetectorActionHandler( clusterService, clientMock, + transportService, channel, anomalyDetectionIndices, detectorId, @@ -639,7 +659,8 @@ public void testTenMultiEntityDetectorsUpdateExistingMultiEntityAd() throws IOEx maxAnomalyFeatures, RestRequest.Method.PUT, xContentRegistry(), - null + null, + adTaskManager ); handler.start();