Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Moving Preview Anomaly Detectors to Transport layer #321

Merged
merged 6 commits into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorResponse',
'com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.PreviewAnomalyDetectorTransportAction*',
saratvemulapalli marked this conversation as resolved.
Show resolved Hide resolved
'com.amazon.opendistroforelasticsearch.ad.transport.PreviewAnomalyDetectorResponse',
saratvemulapalli marked this conversation as resolved.
Show resolved Hide resolved
'com.amazon.opendistroforelasticsearch.ad.transport.PreviewAnomalyDetectorRequest',

// TODO: hc caused coverage to drop
'com.amazon.opendistroforelasticsearch.ad.NodeStateManager',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import com.amazon.opendistroforelasticsearch.ad.rest.RestExecuteAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestGetAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestIndexAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestPreviewAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyDetectorInfoAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyResultAction;
Expand Down Expand Up @@ -129,6 +130,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.PreviewAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.PreviewAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingAction;
Expand Down Expand Up @@ -232,14 +235,11 @@ public List<RestHandler> getRestHandlers(
RestSearchAnomalyDetectorAction searchAnomalyDetectorAction = new RestSearchAnomalyDetectorAction();
RestSearchAnomalyResultAction searchAnomalyResultAction = new RestSearchAnomalyResultAction();
RestDeleteAnomalyDetectorAction deleteAnomalyDetectorAction = new RestDeleteAnomalyDetectorAction();
RestExecuteAnomalyDetectorAction executeAnomalyDetectorAction = new RestExecuteAnomalyDetectorAction(
settings,
clusterService,
anomalyDetectorRunner
);
RestExecuteAnomalyDetectorAction executeAnomalyDetectorAction = new RestExecuteAnomalyDetectorAction(settings, clusterService);
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(adStats, this.nodeFilter);
RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction(settings, clusterService);
RestSearchAnomalyDetectorInfoAction searchAnomalyDetectorInfoAction = new RestSearchAnomalyDetectorInfoAction();
RestPreviewAnomalyDetectorAction previewAnomalyDetectorAction = new RestPreviewAnomalyDetectorAction();

return ImmutableList
.of(
Expand All @@ -251,7 +251,8 @@ public List<RestHandler> getRestHandlers(
executeAnomalyDetectorAction,
anomalyDetectorJobAction,
statsAnomalyDetectorAction,
searchAnomalyDetectorInfoAction
searchAnomalyDetectorInfoAction,
previewAnomalyDetectorAction
);
}

Expand Down Expand Up @@ -607,7 +608,8 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(ADResultBulkAction.INSTANCE, ADResultBulkTransportAction.class),
new ActionHandler<>(EntityResultAction.INSTANCE, EntityResultTransportAction.class),
new ActionHandler<>(EntityProfileAction.INSTANCE, EntityProfileTransportAction.class),
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class)
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class),
new ActionHandler<>(PreviewAnomalyDetectorAction.INSTANCE, PreviewAnomalyDetectorTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@

package com.amazon.opendistroforelasticsearch.ad.rest;

import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_ANOMALY_FEATURES;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT;
import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.DETECTOR_ID;
import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.PREVIEW;
import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.RUN;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

Expand All @@ -29,35 +27,23 @@
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestActionListener;
import org.elasticsearch.rest.action.RestToXContentListener;

import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorExecutionInput;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultRequest;
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
import com.google.common.collect.ImmutableList;

/**
Expand All @@ -66,21 +52,14 @@
public class RestExecuteAnomalyDetectorAction extends BaseRestHandler {

public static final String DETECT_DATA_ACTION = "execute_anomaly_detector";
public static final String ANOMALY_RESULT = "anomaly_result";
public static final String ANOMALY_DETECTOR = "anomaly_detector";
private final AnomalyDetectorRunner anomalyDetectorRunner;
// TODO: apply timeout config
private volatile TimeValue requestTimeout;
private volatile Integer maxAnomalyFeatures;

private final Logger logger = LogManager.getLogger(RestExecuteAnomalyDetectorAction.class);

public RestExecuteAnomalyDetectorAction(Settings settings, ClusterService clusterService, AnomalyDetectorRunner anomalyDetectorRunner) {
this.anomalyDetectorRunner = anomalyDetectorRunner;
public RestExecuteAnomalyDetectorAction(Settings settings, ClusterService clusterService) {
this.requestTimeout = REQUEST_TIMEOUT.get(settings);
maxAnomalyFeatures = MAX_ANOMALY_FEATURES.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(REQUEST_TIMEOUT, it -> requestTimeout = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_ANOMALY_FEATURES, it -> maxAnomalyFeatures = it);
}

@Override
Expand All @@ -102,42 +81,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
return;
}

if (rawPath.endsWith(PREVIEW)) {
if (input.getDetector() != null) {
error = validateDetector(input.getDetector());
if (StringUtils.isNotBlank(error)) {
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, error));
return;
}
anomalyDetectorRunner
.executeDetector(
input.getDetector(),
input.getPeriodStart(),
input.getPeriodEnd(),
getPreviewDetectorActionListener(channel, input.getDetector())
);
} else {
preivewAnomalyDetector(client, channel, input);
}
} else if (rawPath.endsWith(RUN)) {
AnomalyResultRequest getRequest = new AnomalyResultRequest(
input.getDetectorId(),
input.getPeriodStart().toEpochMilli(),
input.getPeriodEnd().toEpochMilli()
);
client.execute(AnomalyResultAction.INSTANCE, getRequest, new RestToXContentListener<>(channel));
}
AnomalyResultRequest getRequest = new AnomalyResultRequest(
input.getDetectorId(),
input.getPeriodStart().toEpochMilli(),
input.getPeriodEnd().toEpochMilli()
);
client.execute(AnomalyResultAction.INSTANCE, getRequest, new RestToXContentListener<>(channel));
};
}

private String validateDetector(AnomalyDetector detector) {
if (detector.getFeatureAttributes().isEmpty()) {
return "Can't preview detector without feature";
} else {
return RestHandlerUtils.validateAnomalyDetector(detector, maxAnomalyFeatures);
}
}

private AnomalyDetectorExecutionInput getAnomalyDetectorExecutionInput(RestRequest request) throws IOException {
String detectorId = null;
if (request.hasParam(DETECTOR_ID)) {
Expand Down Expand Up @@ -166,75 +118,6 @@ private String validateAdExecutionInput(AnomalyDetectorExecutionInput input) {
return null;
}

private void preivewAnomalyDetector(NodeClient client, RestChannel channel, AnomalyDetectorExecutionInput input) {
if (!StringUtils.isBlank(input.getDetectorId())) {
GetRequest getRequest = new GetRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX).id(input.getDetectorId());
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(getRequest, onGetAnomalyDetectorResponse(channel, input));
} catch (Exception e) {
logger.error("Fail to get detector for preview", e);
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
} else {
channel.sendResponse(new BytesRestResponse(RestStatus.NOT_FOUND, "Wrong input, no detector id"));
}
}

private RestActionListener<GetResponse> onGetAnomalyDetectorResponse(RestChannel channel, AnomalyDetectorExecutionInput input) {
return new RestActionListener<GetResponse>(channel) {
@Override
protected void processResponse(GetResponse response) throws Exception {
if (!response.isExists()) {
XContentBuilder message = channel
.newErrorBuilder()
.startObject()
.field("message", "Can't find anomaly detector with id:" + response.getId())
.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.NOT_FOUND, message));
return;
}
XContentParser parser = XContentType.JSON
.xContent()
.createParser(
channel.request().getXContentRegistry(),
LoggingDeprecationHandler.INSTANCE,
response.getSourceAsBytesRef().streamInput()
);

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId(), response.getVersion());

anomalyDetectorRunner
.executeDetector(
detector,
input.getPeriodStart(),
input.getPeriodEnd(),
getPreviewDetectorActionListener(channel, detector)
);
}
};
}

private ActionListener getPreviewDetectorActionListener(RestChannel channel, AnomalyDetector detector) {
return ActionListener.wrap(anomalyResult -> {
XContentBuilder builder = channel
.newBuilder()
.startObject()
.field(ANOMALY_RESULT, anomalyResult)
.field(ANOMALY_DETECTOR, detector)
.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
logger.error("Unexpected error running anomaly detector " + detector.getDetectorId(), exception);
try {
XContentBuilder builder = channel.newBuilder().startObject().field(ANOMALY_DETECTOR, detector).endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, builder));
} catch (IOException e) {
logger.error("Fail to send back exception message" + detector.getDetectorId(), exception);
}
});
}

@Override
public List<Route> routes() {
return ImmutableList
Expand All @@ -243,11 +126,6 @@ public List<Route> routes() {
new Route(
RestRequest.Method.POST,
String.format(Locale.ROOT, "%s/{%s}/%s", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, DETECTOR_ID, RUN)
),
// preview detector
new Route(
RestRequest.Method.POST,
String.format(Locale.ROOT, "%s/{%s}/%s", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, DETECTOR_ID, PREVIEW)
)
);
}
Expand Down
Loading