Skip to content

Commit

Permalink
[ML] Datafeed deprecation checks (#38026)
Browse files Browse the repository at this point in the history
Deprecation checks for the ML datafeed
query and aggregations.
  • Loading branch information
droberts195 authored Jan 30, 2019
1 parent 81c443c commit be78816
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.deprecation;

import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
Expand All @@ -23,9 +24,12 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -67,16 +71,19 @@ public static class Response extends ActionResponse implements ToXContentObject
private List<DeprecationIssue> clusterSettingsIssues;
private List<DeprecationIssue> nodeSettingsIssues;
private Map<String, List<DeprecationIssue>> indexSettingsIssues;
private List<DeprecationIssue> mlSettingsIssues;

public Response() {
}

public Response(List<DeprecationIssue> clusterSettingsIssues,
List<DeprecationIssue> nodeSettingsIssues,
Map<String, List<DeprecationIssue>> indexSettingsIssues) {
Map<String, List<DeprecationIssue>> indexSettingsIssues,
List<DeprecationIssue> mlSettingsIssues) {
this.clusterSettingsIssues = clusterSettingsIssues;
this.nodeSettingsIssues = nodeSettingsIssues;
this.indexSettingsIssues = indexSettingsIssues;
this.mlSettingsIssues = mlSettingsIssues;
}

public List<DeprecationIssue> getClusterSettingsIssues() {
Expand All @@ -91,12 +98,21 @@ public Map<String, List<DeprecationIssue>> getIndexSettingsIssues() {
return indexSettingsIssues;
}

public List<DeprecationIssue> getMlSettingsIssues() {
return mlSettingsIssues;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterSettingsIssues = in.readList(DeprecationIssue::new);
nodeSettingsIssues = in.readList(DeprecationIssue::new);
indexSettingsIssues = in.readMapOfLists(StreamInput::readString, DeprecationIssue::new);
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
mlSettingsIssues = in.readList(DeprecationIssue::new);
} else {
mlSettingsIssues = Collections.emptyList();
}
}

@Override
Expand All @@ -105,6 +121,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeList(clusterSettingsIssues);
out.writeList(nodeSettingsIssues);
out.writeMapOfLists(indexSettingsIssues, StreamOutput::writeString, (o, v) -> v.writeTo(o));
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
out.writeList(mlSettingsIssues);
}
}

@Override
Expand All @@ -114,23 +133,24 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.array("node_settings", nodeSettingsIssues.toArray())
.field("index_settings")
.map(indexSettingsIssues)
.array("ml_settings", mlSettingsIssues.toArray())
.endObject();
}


@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return Objects.equals(clusterSettingsIssues, response.clusterSettingsIssues) &&
Objects.equals(nodeSettingsIssues, response.nodeSettingsIssues) &&
Objects.equals(indexSettingsIssues, response.indexSettingsIssues);
Objects.equals(indexSettingsIssues, response.indexSettingsIssues) &&
Objects.equals(mlSettingsIssues, response.mlSettingsIssues);
}

@Override
public int hashCode() {
return Objects.hash(clusterSettingsIssues, nodeSettingsIssues, indexSettingsIssues);
return Objects.hash(clusterSettingsIssues, nodeSettingsIssues, indexSettingsIssues, mlSettingsIssues);
}

/**
Expand All @@ -145,22 +165,30 @@ public int hashCode() {
* @param indexNameExpressionResolver Used to resolve indices into their concrete names
* @param indices The list of index expressions to evaluate using `indexNameExpressionResolver`
* @param indicesOptions The options to use when resolving and filtering which indices to check
* @param datafeeds The ml datafeed configurations
* @param clusterSettingsChecks The list of cluster-level checks
* @param nodeSettingsChecks The list of node-level checks
* @param indexSettingsChecks The list of index-level checks that will be run across all specified
* concrete indices
* @param mlSettingsCheck The list of ml checks
* @return The list of deprecation issues found in the cluster
*/
public static DeprecationInfoAction.Response from(List<NodeInfo> nodesInfo, List<NodeStats> nodesStats, ClusterState state,
IndexNameExpressionResolver indexNameExpressionResolver,
String[] indices, IndicesOptions indicesOptions,
List<Function<ClusterState,DeprecationIssue>>clusterSettingsChecks,
List<BiFunction<List<NodeInfo>, List<NodeStats>, DeprecationIssue>> nodeSettingsChecks,
List<Function<IndexMetaData, DeprecationIssue>> indexSettingsChecks) {
IndexNameExpressionResolver indexNameExpressionResolver,
String[] indices, IndicesOptions indicesOptions,
List<DatafeedConfig> datafeeds,
List<Function<ClusterState,DeprecationIssue>>clusterSettingsChecks,
List<BiFunction<List<NodeInfo>, List<NodeStats>, DeprecationIssue>> nodeSettingsChecks,
List<Function<IndexMetaData, DeprecationIssue>> indexSettingsChecks,
List<Function<DatafeedConfig, DeprecationIssue>> mlSettingsCheck) {
List<DeprecationIssue> clusterSettingsIssues = filterChecks(clusterSettingsChecks,
(c) -> c.apply(state));
List<DeprecationIssue> nodeSettingsIssues = filterChecks(nodeSettingsChecks,
(c) -> c.apply(nodesInfo, nodesStats));
List<DeprecationIssue> mlSettingsIssues = new ArrayList<>();
for (DatafeedConfig config : datafeeds) {
mlSettingsIssues.addAll(filterChecks(mlSettingsCheck, (c) -> c.apply(config)));
}

String[] concreteIndexNames = indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, indices);

Expand All @@ -174,7 +202,7 @@ public static DeprecationInfoAction.Response from(List<NodeInfo> nodesInfo, List
}
}

return new DeprecationInfoAction.Response(clusterSettingsIssues, nodeSettingsIssues, indexSettingsIssues);
return new DeprecationInfoAction.Response(clusterSettingsIssues, nodeSettingsIssues, indexSettingsIssues, mlSettingsIssues);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ public void setParsedQuery(QueryBuilder query) {
}
}

void setQuery(Map<String, Object> query) {
public void setQuery(Map<String, Object> query) {
this.query = ExceptionsHelper.requireNonNull(query, QUERY.getPreferredName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigTests;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -45,13 +47,15 @@ protected DeprecationInfoAction.Response createTestInstance() {
.limit(randomIntBetween(0, 10)).collect(Collectors.toList());
List<DeprecationIssue> nodeIssues = Stream.generate(DeprecationIssueTests::createTestInstance)
.limit(randomIntBetween(0, 10)).collect(Collectors.toList());
List<DeprecationIssue> mlIssues = Stream.generate(DeprecationIssueTests::createTestInstance)
.limit(randomIntBetween(0, 10)).collect(Collectors.toList());
Map<String, List<DeprecationIssue>> indexIssues = new HashMap<>();
for (int i = 0; i < randomIntBetween(0, 10); i++) {
List<DeprecationIssue> perIndexIssues = Stream.generate(DeprecationIssueTests::createTestInstance)
.limit(randomIntBetween(0, 10)).collect(Collectors.toList());
indexIssues.put(randomAlphaOfLength(10), perIndexIssues);
}
return new DeprecationInfoAction.Response(clusterIssues, nodeIssues, indexIssues);
return new DeprecationInfoAction.Response(clusterIssues, nodeIssues, indexIssues, mlIssues);
}

@Override
Expand Down Expand Up @@ -80,12 +84,14 @@ public void testFrom() throws IOException {
List<NodeStats> nodeStats = Collections.singletonList(new NodeStats(discoveryNode, 0L, null,
null, null, null, null, null, null, null, null,
null, null, null, null));
List<DatafeedConfig> datafeeds = Collections.singletonList(DatafeedConfigTests.createRandomizedDatafeedConfig("foo"));
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false,
true, true);
boolean clusterIssueFound = randomBoolean();
boolean nodeIssueFound = randomBoolean();
boolean indexIssueFound = randomBoolean();
boolean mlIssueFound = randomBoolean();
DeprecationIssue foundIssue = DeprecationIssueTests.createTestInstance();
List<Function<ClusterState, DeprecationIssue>> clusterSettingsChecks =
Collections.unmodifiableList(Arrays.asList(
Expand All @@ -100,10 +106,14 @@ public void testFrom() throws IOException {
Collections.unmodifiableList(Arrays.asList(
(idx) -> indexIssueFound ? foundIssue : null
));
List<Function<DatafeedConfig, DeprecationIssue>> mlSettingsChecks =
Collections.unmodifiableList(Arrays.asList(
(idx) -> mlIssueFound ? foundIssue : null
));

DeprecationInfoAction.Response response = DeprecationInfoAction.Response.from(nodeInfos, nodeStats, state,
resolver, Strings.EMPTY_ARRAY, indicesOptions,
clusterSettingsChecks, nodeSettingsChecks, indexSettingsChecks);
resolver, Strings.EMPTY_ARRAY, indicesOptions, datafeeds,
clusterSettingsChecks, nodeSettingsChecks, indexSettingsChecks, mlSettingsChecks);

if (clusterIssueFound) {
assertThat(response.getClusterSettingsIssues(), equalTo(Collections.singletonList(foundIssue)));
Expand All @@ -123,5 +133,11 @@ public void testFrom() throws IOException {
} else {
assertTrue(response.getIndexSettingsIssues().isEmpty());
}

if (mlIssueFound) {
assertThat(response.getMlSettingsIssues(), equalTo(Collections.singletonList(foundIssue)));
} else {
assertTrue(response.getMlSettingsIssues().isEmpty());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
import org.elasticsearch.xpack.core.deprecation.DeprecationIssue;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -41,6 +42,12 @@ private DeprecationChecks() {
Collections.unmodifiableList(Arrays.asList(
IndexDeprecationChecks::oldIndicesCheck));

static List<Function<DatafeedConfig, DeprecationIssue>> ML_SETTINGS_CHECKS =
Collections.unmodifiableList(Arrays.asList(
MlDeprecationChecks::checkDataFeedAggregations,
MlDeprecationChecks::checkDataFeedQuery
));

/**
* helper utility function to reduce repeat of running a specific {@link List} of checks.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.deprecation;

import org.elasticsearch.xpack.core.deprecation.DeprecationIssue;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;

import java.util.List;

/**
* Check the {@link DatafeedConfig} query and aggregations for deprecated usages.
*/
final class MlDeprecationChecks {

private MlDeprecationChecks() {
}

static DeprecationIssue checkDataFeedQuery(DatafeedConfig datafeedConfig) {
List<String> deprecations = datafeedConfig.getQueryDeprecations();
if (deprecations.isEmpty()) {
return null;
} else {
return new DeprecationIssue(DeprecationIssue.Level.WARNING,
"Datafeed [" + datafeedConfig.getId() + "] uses deprecated query options",
"https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-7.0.html#breaking_70_search_changes",
deprecations.toString());
}
}

static DeprecationIssue checkDataFeedAggregations(DatafeedConfig datafeedConfig) {
List<String> deprecations = datafeedConfig.getAggDeprecations();
if (deprecations.isEmpty()) {
return null;
} else {
return new DeprecationIssue(DeprecationIssue.Level.WARNING,
"Datafeed [" + datafeedConfig.getId() + "] uses deprecated aggregation options",
"https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking-changes-7.0.html" +
"#breaking_70_aggregations_changes", deprecations.toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,32 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;

import java.util.Collections;
import java.util.List;

public class TransportDeprecationInfoAction extends TransportMasterNodeReadAction<DeprecationInfoAction.Request,
DeprecationInfoAction.Response> {

private final XPackLicenseState licenseState;
private final NodeClient client;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final Settings settings;

@Inject
public TransportDeprecationInfoAction(TransportService transportService, ClusterService clusterService,
public TransportDeprecationInfoAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
XPackLicenseState licenseState, NodeClient client) {
Expand All @@ -45,6 +53,7 @@ public TransportDeprecationInfoAction(TransportService transportService, Cluster
this.licenseState = licenseState;
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.settings = settings;
}

@Override
Expand Down Expand Up @@ -83,16 +92,37 @@ protected final void masterOperation(final DeprecationInfoAction.Request request
if (nodesStatsResponse.hasFailures()) {
throw nodesStatsResponse.failures().get(0);
}
listener.onResponse(DeprecationInfoAction.Response.from(nodesInfoResponse.getNodes(),
nodesStatsResponse.getNodes(), state, indexNameExpressionResolver,
request.indices(), request.indicesOptions(),
DeprecationChecks.CLUSTER_SETTINGS_CHECKS, DeprecationChecks.NODE_SETTINGS_CHECKS,
DeprecationChecks.INDEX_SETTINGS_CHECKS));

getDatafeedConfigs(ActionListener.wrap(
datafeeds -> {
listener.onResponse(
DeprecationInfoAction.Response.from(nodesInfoResponse.getNodes(),
nodesStatsResponse.getNodes(), state, indexNameExpressionResolver,
request.indices(), request.indicesOptions(), datafeeds,
DeprecationChecks.CLUSTER_SETTINGS_CHECKS,
DeprecationChecks.NODE_SETTINGS_CHECKS,
DeprecationChecks.INDEX_SETTINGS_CHECKS,
DeprecationChecks.ML_SETTINGS_CHECKS));
},
listener::onFailure
));
}, listener::onFailure),
client.admin().cluster()::nodesStats);
}, listener::onFailure), client.admin().cluster()::nodesInfo);
} else {
listener.onFailure(LicenseUtils.newComplianceException(XPackField.DEPRECATION));
}
}

private void getDatafeedConfigs(ActionListener<List<DatafeedConfig>> listener) {
if (XPackSettings.MACHINE_LEARNING_ENABLED.get(settings) == false) {
listener.onResponse(Collections.emptyList());
} else {
ClientHelper.executeAsyncWithOrigin(client, ClientHelper.DEPRECATION_ORIGIN, GetDatafeedsAction.INSTANCE,
new GetDatafeedsAction.Request(GetDatafeedsAction.ALL), ActionListener.wrap(
datafeedsResponse -> listener.onResponse(datafeedsResponse.getResponse().results()),
listener::onFailure
));
}
}
}
Loading

0 comments on commit be78816

Please sign in to comment.