Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Upgrade mapping #278

Merged
merged 3 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.transport.EntityProfileRequest',
'com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener',
'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorRequest',
'com.amazon.opendistroforelasticsearch.ad.util.ThrowingSupplierWrapper',
]

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import com.amazon.opendistroforelasticsearch.ad.common.exception.AnomalyDetectionException;
import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException;
import com.amazon.opendistroforelasticsearch.ad.common.exception.InternalFailure;
import com.amazon.opendistroforelasticsearch.ad.indices.ADIndex;
import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.model.FeatureData;
Expand Down Expand Up @@ -82,6 +84,7 @@ public class AnomalyDetectorJobRunner implements ScheduledJobRunner {
private AnomalyIndexHandler<AnomalyResult> anomalyResultHandler;
private ConcurrentHashMap<String, Integer> detectorEndRunExceptionCount;
private DetectionStateHandler detectionStateHandler;
private AnomalyDetectionIndices indexUtil;

public static AnomalyDetectorJobRunner getJobRunnerInstance() {
if (INSTANCE != null) {
Expand Down Expand Up @@ -126,6 +129,10 @@ public void setDetectionStateHandler(DetectionStateHandler detectionStateHandler
this.detectionStateHandler = detectionStateHandler;
}

public void setIndexUtil(AnomalyDetectionIndices indexUtil) {
this.indexUtil = indexUtil;
}

@Override
public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) {
String detectorId = jobParameter.getName();
Expand Down Expand Up @@ -202,6 +209,7 @@ protected void runAdJob(
}

try {
indexUtil.updateMappingIfNecessary();
AnomalyResultRequest request = new AnomalyResultRequest(
detectorId,
detectionStartTime.toEpochMilli(),
Expand Down Expand Up @@ -451,7 +459,8 @@ private void indexAnomalyResult(
executionStartTime,
Instant.now(),
response.getError(),
user
user,
indexUtil.getSchemaVersion(ADIndex.RESULT)
);
anomalyResultHandler.index(anomalyResult, detectorId);
detectionStateHandler.saveError(response.getError(), detectorId);
Expand Down Expand Up @@ -508,7 +517,8 @@ private void indexAnomalyResultException(
executionStartTime,
Instant.now(),
errorMessage,
user
user,
indexUtil.getSchemaVersion(ADIndex.RESULT)
);
anomalyResultHandler.index(anomalyResult, detectorId);
detectionStateHandler.saveError(errorMessage, detectorId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,10 @@ public List<RestHandler> getRestHandlers(
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setDetectionStateHandler(detectorStateHandler);
jobRunner.setSettings(settings);
jobRunner.setIndexUtil(anomalyDetectionIndices);

RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction();
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(
settings,
clusterService,
anomalyDetectionIndices
);
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(settings, clusterService);
RestSearchAnomalyDetectorAction searchAnomalyDetectorAction = new RestSearchAnomalyDetectorAction();
RestSearchAnomalyResultAction searchAnomalyResultAction = new RestSearchAnomalyResultAction();
RestDeleteAnomalyDetectorAction deleteAnomalyDetectorAction = new RestDeleteAnomalyDetectorAction();
Expand Down Expand Up @@ -432,7 +429,7 @@ public Collection<Object> createComponents(
AnomalyDetectorSettings.NUM_MIN_SAMPLES,
settings,
threadPool,
AnomalyDetectorSettings.MAX_CACHE_HANDLING_PER_SECOND
AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND.get(settings)
);

CacheProvider cacheProvider = new CacheProvider(cache);
Expand Down Expand Up @@ -582,7 +579,8 @@ public List<Setting<?>> getSettings() {
AnomalyDetectorSettings.MAX_ENTITIES_FOR_PREVIEW,
AnomalyDetectorSettings.INDEX_PRESSURE_SOFT_LIMIT,
AnomalyDetectorSettings.MAX_PRIMARY_SHARDS,
AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES
AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES,
AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND
);
return unmodifiableList(Stream.concat(enabledSetting.stream(), systemSetting.stream()).collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonValue;
import com.amazon.opendistroforelasticsearch.ad.feature.FeatureManager;
import com.amazon.opendistroforelasticsearch.ad.feature.Features;
import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager;
Expand Down Expand Up @@ -158,7 +159,8 @@ private List<AnomalyResult> parsePreviewResult(
null,
null,
entity,
detector.getUser()
detector.getUser(),
CommonValue.NO_SCHEMA_VERSION
);
} else {
result = new AnomalyResult(
Expand All @@ -173,7 +175,8 @@ private List<AnomalyResult> parsePreviewResult(
null,
null,
entity,
detector.getUser()
detector.getUser(),
CommonValue.NO_SCHEMA_VERSION
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistroforelasticsearch.ad.caching;

import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.COOLDOWN_MINUTES;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND;

import java.time.Clock;
import java.time.Duration;
Expand Down Expand Up @@ -82,7 +83,7 @@ public class PriorityCache implements EntityCache {
private int coolDownMinutes;
private ThreadPool threadPool;
private Random random;
private final RateLimiter cacheMissHandlingLimiter;
private RateLimiter cacheMissHandlingLimiter;

public PriorityCache(
CheckpointDao checkpointDao,
Expand Down Expand Up @@ -125,6 +126,9 @@ public PriorityCache(
this.random = new Random(42);

this.cacheMissHandlingLimiter = RateLimiter.create(cacheMissRateHandlingLimiter);
clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(MAX_CACHE_MISS_HANDLING_PER_SECOND, it -> this.cacheMissHandlingLimiter = RateLimiter.create(it));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,6 @@ public class CommonName {
public static final String IP_TYPE = "ip";

public static final String TOTAL_UPDATES = "total_updates";

public static final String SCHEMA_VERSION_FIELD = "schema_version";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.constant;

public class CommonValue {
// unknown or no schema version
public static Integer NO_SCHEMA_VERSION = 0;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.indices;

import java.util.function.Supplier;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.DetectorInternalState;
import com.amazon.opendistroforelasticsearch.ad.util.ThrowingSupplierWrapper;

/**
* Represent an AD index
*
*/
public enum ADIndex {

// throw RuntimeException since we don't know how to handle the case when the mapping reading throws IOException
RESULT(
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
true,
ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getAnomalyResultMappings)
),
CONFIG(
AnomalyDetector.ANOMALY_DETECTORS_INDEX,
false,
ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getAnomalyDetectorMappings)
),
JOB(
AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX,
false,
ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getAnomalyDetectorJobMappings)
),
CHECKPOINT(
CommonName.CHECKPOINT_INDEX_NAME,
false,
ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getCheckpointMappings)
),
STATE(
DetectorInternalState.DETECTOR_STATE_INDEX,
false,
ThrowingSupplierWrapper.throwingSupplierWrapper(AnomalyDetectionIndices::getDetectorStateMappings)
);

private final String indexName;
// whether we use an alias for the index
private final boolean alias;
private final String mapping;

ADIndex(String name, boolean alias, Supplier<String> mappingSupplier) {
this.indexName = name;
this.alias = alias;
this.mapping = mappingSupplier.get();
}

public String getIndexName() {
return indexName;
}

public boolean isAlias() {
return alias;
}

public String getMapping() {
return mapping;
}

}
Loading