diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index 5b2e930e..311cd771 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -68,6 +68,7 @@ import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction; import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction; import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils; +import com.amazon.opendistroforelasticsearch.ad.util.Throttler; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.gson.Gson; @@ -196,7 +197,9 @@ public Collection createComponents( NamedWriteableRegistry namedWriteableRegistry ) { Settings settings = environment.settings(); - ClientUtil clientUtil = new ClientUtil(settings, client); + Clock clock = Clock.systemUTC(); + Throttler throttler = new Throttler(clock); + ClientUtil clientUtil = new ClientUtil(settings, client, throttler); IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService); anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, clientUtil); this.clusterService = clusterService; @@ -209,7 +212,6 @@ public Collection createComponents( JvmService jvmService = new JvmService(environment.settings()); RandomCutForestSerDe rcfSerde = new RandomCutForestSerDe(); CheckpointDao checkpoint = new CheckpointDao(client, clientUtil, CommonName.CHECKPOINT_INDEX_NAME); - Clock clock = Clock.systemUTC(); ModelManager modelManager = new ModelManager( clusterService, diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java index aff9c50b..16055aeb 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/HourlyCron.java @@ -43,6 +43,8 @@ public HourlyCron(ClusterService clusterService, Client client) { public void run() { DiscoveryNode[] dataNodes = clusterService.state().nodes().getDataNodes().values().toArray(DiscoveryNode.class); + // we also add the cancel query function here based on query text from the negative cache. + CronRequest modelDeleteRequest = new CronRequest(dataNodes); client.execute(CronAction.INSTANCE, modelDeleteRequest, ActionListener.wrap(response -> { if (response.hasFailures()) { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java index 1b8818ce..09ccba25 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDao.java @@ -115,6 +115,10 @@ public Optional getLatestDataTime(AnomalyDetector detector) { /** * Gets features for the given time period. + * This function also adds given detector to negative cache before sending es request. + * Once response/exception is received within timeout, this request will be treated as complete + * and cleared from the negative cache. + * Otherwise this detector entry remain in the negative to reject further request. * * @param detector info about indices, documents, feature query * @param startTime epoch milliseconds at the beginning of the period @@ -124,8 +128,10 @@ public Optional getLatestDataTime(AnomalyDetector detector) { */ public Optional getFeaturesForPeriod(AnomalyDetector detector, long startTime, long endTime) { SearchRequest searchRequest = createFeatureSearchRequest(detector, startTime, endTime, Optional.empty()); + + // send throttled request: this request will clear the negative cache if the request finished within timeout return clientUtil - .timedRequest(searchRequest, logger, client::search) + .throttledTimedRequest(searchRequest, logger, client::search, detector) .flatMap(resp -> parseResponse(resp, detector.getEnabledFeatureIds())); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java index 76246e62..eb57123c 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManager.java @@ -209,4 +209,13 @@ public void addPressure(String nodeId) { public void resetBackpressureCounter(String nodeId) { backpressureMuter.remove(nodeId); } + + /** + * Check if there is running query on given detector + * @param detector Anomaly Detector + * @return true if given detector has a running query else false + */ + public boolean hasRunningQuery(AnomalyDetector detector) { + return clientUtil.hasRunningQuery(detector); + } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java index e0727764..b4cdfe2f 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTransportAction.java @@ -249,6 +249,11 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener< return; } AnomalyDetector anomalyDetector = detector.get(); + if (stateManager.hasRunningQuery(anomalyDetector)) { + LOG.error("There is one query running for detectorId: {}", anomalyDetector.getDetectorId()); + listener.onFailure(new EndRunException(adID, "There is one query running on AnomalyDetector", true)); + return; + } String thresholdModelID = modelManager.getThresholdModelId(adID); Optional thresholdNode = hashRing.getOwningNode(thresholdModelID); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java index fa96c832..136583fb 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java @@ -24,6 +24,9 @@ import java.util.function.BiConsumer; import java.util.function.Function; +import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; + import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionType; @@ -42,11 +45,13 @@ public class ClientUtil { private volatile TimeValue requestTimeout; private Client client; + private final Throttler throttler; @Inject - public ClientUtil(Settings setting, Client client) { + public ClientUtil(Settings setting, Client client, Throttler throttler) { this.requestTimeout = REQUEST_TIMEOUT.get(setting); this.client = client; + this.throttler = throttler; } /** @@ -152,4 +157,69 @@ public Response ) { return function.apply(request).actionGet(requestTimeout); } + + /** + * Send a nonblocking request with a timeout and return response. The request will first be put into + * the negative cache. Once the request complete, it will be removed from the negative cache. + * + * @param request request like index/search/get + * @param LOG log + * @param consumer functional interface to operate as a client request like client::get + * @param ActionRequest + * @param ActionResponse + * @param detector Anomaly Detector + * @return the response + * @throws EndRunException when there is already a query running + * @throws ElasticsearchTimeoutException when we cannot get response within time. + * @throws IllegalStateException when the waiting thread is interrupted + */ + public Optional throttledTimedRequest( + Request request, + Logger LOG, + BiConsumer> consumer, + AnomalyDetector detector + ) { + try { + // if key already exist, reject the request and throws exception + if (!throttler.insertFilteredQuery(detector.getDetectorId(), request)) { + LOG.error("There is one query running for detectorId: {}", detector.getDetectorId()); + throw new EndRunException(detector.getDetectorId(), "There is one query running on AnomalyDetector", true); + } + AtomicReference respReference = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + + try { + consumer.accept(request, new LatchedActionListener(ActionListener.wrap(response -> { + // clear negative cache + throttler.clearFilteredQuery(detector.getDetectorId()); + respReference.set(response); + }, exception -> { + // clear negative cache + throttler.clearFilteredQuery(detector.getDetectorId()); + LOG.error("Cannot get response for request {}, error: {}", request, exception); + }), latch)); + } catch (Exception e) { + LOG.error("Failed to process the request for detectorId: {}.", detector.getDetectorId()); + throttler.clearFilteredQuery(detector.getDetectorId()); + throw e; + } + + if (!latch.await(requestTimeout.getSeconds(), TimeUnit.SECONDS)) { + throw new ElasticsearchTimeoutException("Cannot get response within time limit: " + request.toString()); + } + return Optional.ofNullable(respReference.get()); + } catch (InterruptedException e1) { + LOG.error(CommonErrorMessages.WAIT_ERR_MSG); + throw new IllegalStateException(e1); + } + } + + /** + * Check if there is running query on given detector + * @param detector Anomaly Detector + * @return true if given detector has a running query else false + */ + public boolean hasRunningQuery(AnomalyDetector detector) { + return throttler.getFilteredQuery(detector.getDetectorId()).isPresent(); + } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java new file mode 100644 index 00000000..4c5ebaed --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/Throttler.java @@ -0,0 +1,66 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazon.opendistroforelasticsearch.ad.util; + +import java.time.Clock; +import java.time.Instant; +import java.util.AbstractMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import org.elasticsearch.action.ActionRequest; + +/** + * Utility functions for throttling query. + */ +public class Throttler { + // negativeCache is used to reject search query if given detector already has one query running + // key is detectorId, value is an entry. Key is ActionRequest and value is the timestamp + private final ConcurrentHashMap> negativeCache; + private final Clock clock; + + public Throttler(Clock clock) { + this.negativeCache = new ConcurrentHashMap<>(); + this.clock = clock; + } + + /** + * Get negative cache value(ActionRequest, Instant) for given detector + * @param detectorId AnomalyDetector ID + * @return negative cache value(ActionRequest, Instant) + */ + public Optional> getFilteredQuery(String detectorId) { + return Optional.ofNullable(negativeCache.get(detectorId)); + } + + /** + * Insert the negative cache entry for given detector + * If key already exists, return false. Otherwise true. + * @param detectorId AnomalyDetector ID + * @param request ActionRequest + * @return true if key doesn't exist otherwise false. + */ + public synchronized boolean insertFilteredQuery(String detectorId, ActionRequest request) { + return negativeCache.putIfAbsent(detectorId, new AbstractMap.SimpleEntry<>(request, clock.instant())) == null; + } + + /** + * Clear the negative cache for given detector. + * @param detectorId AnomalyDetector ID + */ + public void clearFilteredQuery(String detectorId) { + negativeCache.remove(detectorId); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java index 20751c5c..bc8865cd 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManagerTests.java @@ -30,6 +30,7 @@ import com.amazon.opendistroforelasticsearch.ad.dataprocessor.SingleFeatureLinearUniformInterpolator; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; +import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager; import junitparams.JUnitParamsRunner; import junitparams.Parameters; @@ -88,6 +89,9 @@ public class FeatureManagerTests { @Mock private Clock clock; + @Mock + private ADStateManager stateManager; + private FeatureManager featureManager; @Before @@ -308,6 +312,7 @@ public void clear_deleteFeatures() { for (int i = 1; i <= shingleSize; i++) { start = i * 10; end = (i + 1) * 10; + when(searchFeatureDao.getFeaturesForPeriod(detector, start, end)).thenReturn(Optional.of(new double[] { i })); featureManager.getCurrentFeatures(detector, start, end); } @@ -329,6 +334,7 @@ public void maintenance_removeStaleData() { for (int i = 1; i <= shingleSize; i++) { start = i * 10; end = (i + 1) * 10; + when(searchFeatureDao.getFeaturesForPeriod(detector, start, end)).thenReturn(Optional.of(new double[] { i })); featureManager.getCurrentFeatures(detector, start, end); } @@ -351,6 +357,7 @@ public void maintenance_keepRecentData() { for (int i = 1; i <= shingleSize; i++) { start = i * 10; end = (i + 1) * 10; + when(searchFeatureDao.getFeaturesForPeriod(detector, start, end)).thenReturn(Optional.of(new double[] { i })); featureManager.getCurrentFeatures(detector, start, end); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java index f2fc0928..25ebdf7a 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/feature/SearchFeatureDaoTests.java @@ -32,6 +32,7 @@ import com.amazon.opendistroforelasticsearch.ad.dataprocessor.SingleFeatureLinearUniformInterpolator; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; +import com.amazon.opendistroforelasticsearch.ad.transport.ADStateManager; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.util.ParseUtils; import junitparams.JUnitParamsRunner; @@ -127,6 +128,8 @@ public class SearchFeatureDaoTests { private Aggregations aggs; @Mock private Max max; + @Mock + private ADStateManager stateManager; @Mock private AnomalyDetector detector; @@ -171,6 +174,15 @@ public void setup() throws Exception { .timedRequest(eq(searchRequest), anyObject(), Matchers.>>anyObject()); when(searchResponse.getAggregations()).thenReturn(aggregations); + doReturn(Optional.of(searchResponse)) + .when(clientUtil) + .throttledTimedRequest( + eq(searchRequest), + anyObject(), + Matchers.>>anyObject(), + anyObject() + ); + multiSearchRequest = new MultiSearchRequest(); SearchRequest request = new SearchRequest(detector.getIndices().toArray(new String[0])) .preference(SearchFeatureDao.FEATURE_SAMPLE_PREFERENCE); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java index 69a922e5..8887c593 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java @@ -24,6 +24,7 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; +import com.amazon.opendistroforelasticsearch.ad.util.Throttler; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; @@ -39,6 +40,7 @@ import org.junit.Before; import java.io.IOException; +import java.time.Clock; import java.util.HashSet; import java.util.Set; @@ -70,7 +72,9 @@ public void setup() { clusterSetting = new ClusterSettings(settings, clusterSettings); clusterService = TestHelpers.createClusterService(client().threadPool(), clusterSetting); client = mock(Client.class); - requestUtil = new ClientUtil(settings, client); + Clock clock = Clock.systemUTC(); + Throttler throttler = new Throttler(clock); + requestUtil = new ClientUtil(settings, client, throttler); indices = new AnomalyDetectionIndices(client(), clusterService, client().threadPool(), settings, requestUtil); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java index 7706e4cf..c2583461 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java @@ -38,9 +38,12 @@ import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; +import com.amazon.opendistroforelasticsearch.ad.util.Throttler; +import com.google.common.collect.ImmutableMap; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.MapperService; @@ -64,6 +67,7 @@ public class ADStateManagerTests extends ESTestCase { private Client client; private Clock clock; private Duration duration; + private Throttler throttler; @Override protected NamedXContentRegistry xContentRegistry() { @@ -86,12 +90,13 @@ public void setUp() throws Exception { .build(); clock = mock(Clock.class); duration = Duration.ofHours(1); + throttler = new Throttler(clock); stateManager = new ADStateManager( client, xContentRegistry(), modelManager, settings, - new ClientUtil(settings, client), + new ClientUtil(settings, client, throttler), clock, duration ); @@ -203,4 +208,12 @@ public void testMaintenancRemove() throws IOException { assertEquals(0, states.size()); } + + public void testHasRunningQuery() throws IOException { + AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableMap.of(), null); + SearchRequest dummySearchRequest = new SearchRequest(); + assertFalse(stateManager.hasRunningQuery(detector)); + throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest); + assertTrue(stateManager.hasRunningQuery(detector)); + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java index bf1e7a6e..f1250350 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java @@ -23,6 +23,7 @@ import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.ModelsOnNodeSupplier; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils; +import com.amazon.opendistroforelasticsearch.ad.util.Throttler; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.Client; @@ -32,6 +33,7 @@ import org.junit.Before; import org.junit.Test; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -55,7 +57,9 @@ public void setUp() throws Exception { super.setUp(); Client client = client(); - IndexUtils indexUtils = new IndexUtils(client, new ClientUtil(Settings.EMPTY, client), clusterService()); + Clock clock = mock(Clock.class); + Throttler throttler = new Throttler(clock); + IndexUtils indexUtils = new IndexUtils(client, new ClientUtil(Settings.EMPTY, client, throttler), clusterService()); ModelManager modelManager = mock(ModelManager.class); clusterStatName1 = "clusterStat1"; diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java index 37557dd0..b7b4ccd0 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java @@ -40,6 +40,7 @@ import static org.mockito.Mockito.verify; import java.io.IOException; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -81,6 +82,7 @@ import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.util.ColdStartRunner; import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils; +import com.amazon.opendistroforelasticsearch.ad.util.Throttler; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -188,6 +190,7 @@ public void setUp() throws Exception { List userIndex = new ArrayList<>(); userIndex.add("test*"); when(detector.getIndices()).thenReturn(userIndex); + when(detector.getDetectorId()).thenReturn("testDetectorId"); when(stateManager.getAnomalyDetector(any(String.class))).thenReturn(Optional.of(detector)); hashRing = mock(HashRing.class); @@ -232,8 +235,9 @@ public void setUp() throws Exception { }).when(client).index(any(), any()); indexNameResolver = new IndexNameExpressionResolver(); - - ClientUtil clientUtil = new ClientUtil(Settings.EMPTY, client); + Clock clock = mock(Clock.class); + Throttler throttler = new Throttler(clock); + ClientUtil clientUtil = new ClientUtil(Settings.EMPTY, client, throttler); IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService); Map> statsMap = new HashMap>() { @@ -723,6 +727,34 @@ public void testMute() { assertThat(exception.getMessage(), containsString(AnomalyResultTransportAction.NODE_UNRESPONSIVE_ERR_MSG)); } + public void testRejectRequestBasedOnNegativeCache() { + when(stateManager.hasRunningQuery(detector)).thenReturn(true); + AnomalyResultTransportAction action = spy( + new AnomalyResultTransportAction( + new ActionFilters(Collections.emptySet()), + transportService, + client, + settings, + stateManager, + runner, + anomalyDetectionIndices, + featureQuery, + normalModelManager, + hashRing, + clusterService, + indexNameResolver, + threadPool, + adCircuitBreakerService, + adStats + ) + ); + AnomalyResultRequest request = new AnomalyResultRequest(adID, 100, 200); + PlainActionFuture listener = new PlainActionFuture<>(); + action.doExecute(null, request, listener); + Throwable exception = assertException(listener, AnomalyDetectionException.class); + assertThat(exception.getMessage(), containsString("There is one query running on AnomalyDetector")); + } + public void testRCFLatchAwaitException() throws InterruptedException { // These constructors register handler in transport service @@ -994,7 +1026,6 @@ public void testOnFailureNull() throws IOException { public void testColdStartNoTrainingData() throws Exception { when(featureQuery.getColdStartData(any(AnomalyDetector.class))).thenReturn(Optional.empty()); - AnomalyResultTransportAction action = new AnomalyResultTransportAction( new ActionFilters(Collections.emptySet()), transportService, diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java index 896a1a8a..d3660076 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java @@ -22,6 +22,10 @@ import org.junit.Before; import org.junit.Test; +import java.time.Clock; + +import static org.mockito.Mockito.mock; + public class IndexUtilsTests extends ESIntegTestCase { private ClientUtil clientUtil; @@ -29,7 +33,9 @@ public class IndexUtilsTests extends ESIntegTestCase { @Before public void setup() { Client client = client(); - clientUtil = new ClientUtil(Settings.EMPTY, client); + Clock clock = mock(Clock.class); + Throttler throttler = new Throttler(clock); + clientUtil = new ClientUtil(Settings.EMPTY, client, throttler); } @Test diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java new file mode 100644 index 00000000..df91946e --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/ThrottlerTests.java @@ -0,0 +1,70 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.util; + +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; +import org.junit.Test; + +import java.time.Clock; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +public class ThrottlerTests extends ESTestCase { + private Throttler throttler; + + @Before + public void setup() { + Clock clock = mock(Clock.class); + this.throttler = new Throttler(clock); + } + + @Test + public void testGetFilteredQuery() { + AnomalyDetector detector = mock(AnomalyDetector.class); + when(detector.getDetectorId()).thenReturn("test detector Id"); + SearchRequest dummySearchRequest = new SearchRequest(); + throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest); + // case 1: key exists + assertTrue(throttler.getFilteredQuery(detector.getDetectorId()).isPresent()); + // case 2: key doesn't exist + assertFalse(throttler.getFilteredQuery("different test detector Id").isPresent()); + } + + @Test + public void testInsertFilteredQuery() { + AnomalyDetector detector = mock(AnomalyDetector.class); + when(detector.getDetectorId()).thenReturn("test detector Id"); + SearchRequest dummySearchRequest = new SearchRequest(); + // first time: key doesn't exist + assertTrue(throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest)); + // second time: key exists + assertFalse(throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest)); + } + + @Test + public void testClearFilteredQuery() { + AnomalyDetector detector = mock(AnomalyDetector.class); + when(detector.getDetectorId()).thenReturn("test detector Id"); + SearchRequest dummySearchRequest = new SearchRequest(); + assertTrue(throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest)); + throttler.clearFilteredQuery(detector.getDetectorId()); + assertTrue(throttler.insertFilteredQuery(detector.getDetectorId(), dummySearchRequest)); + } + +}