Skip to content

Commit

Permalink
Add analytics plugin usage stats to _xpack/usage
Browse files Browse the repository at this point in the history
Adds analytics plugin usage stats to _xpack/usage.

Closes elastic#54847
  • Loading branch information
imotov committed Apr 7, 2020
1 parent 2bc6996 commit b86bd57
Show file tree
Hide file tree
Showing 10 changed files with 352 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public List<PipelineAggregationSpec> getPipelineAggregations() {
CumulativeCardinalityPipelineAggregationBuilder.NAME,
CumulativeCardinalityPipelineAggregationBuilder::new,
CumulativeCardinalityPipelineAggregator::new,
usage.track(AnalyticsUsage.Item.CUMULATIVE_CARDINALITY,
usage.track(AnalyticsStatsAction.Item.CUMULATIVE_CARDINALITY,
checkLicense(CumulativeCardinalityPipelineAggregationBuilder.PARSER)))
);
}
Expand All @@ -86,24 +86,24 @@ public List<AggregationSpec> getAggregations() {
new AggregationSpec(
StringStatsAggregationBuilder.NAME,
StringStatsAggregationBuilder::new,
usage.track(AnalyticsUsage.Item.STRING_STATS, checkLicense(StringStatsAggregationBuilder.PARSER)))
usage.track(AnalyticsStatsAction.Item.STRING_STATS, checkLicense(StringStatsAggregationBuilder.PARSER)))
.addResultReader(InternalStringStats::new)
.setAggregatorRegistrar(StringStatsAggregationBuilder::registerAggregators),
new AggregationSpec(
BoxplotAggregationBuilder.NAME,
BoxplotAggregationBuilder::new,
usage.track(AnalyticsUsage.Item.BOXPLOT, checkLicense(BoxplotAggregationBuilder.PARSER)))
usage.track(AnalyticsStatsAction.Item.BOXPLOT, checkLicense(BoxplotAggregationBuilder.PARSER)))
.addResultReader(InternalBoxplot::new)
.setAggregatorRegistrar(BoxplotAggregationBuilder::registerAggregators),
new AggregationSpec(
TopMetricsAggregationBuilder.NAME,
TopMetricsAggregationBuilder::new,
usage.track(AnalyticsUsage.Item.TOP_METRICS, checkLicense(TopMetricsAggregationBuilder.PARSER)))
usage.track(AnalyticsStatsAction.Item.TOP_METRICS, checkLicense(TopMetricsAggregationBuilder.PARSER)))
.addResultReader(InternalTopMetrics::new),
new AggregationSpec(
TTestAggregationBuilder.NAME,
TTestAggregationBuilder::new,
usage.track(AnalyticsUsage.Item.T_TEST, checkLicense(TTestAggregationBuilder.PARSER)))
usage.track(AnalyticsStatsAction.Item.T_TEST, checkLicense(TTestAggregationBuilder.PARSER)))
.addResultReader(InternalTTest::new)
);
}
Expand Down Expand Up @@ -137,7 +137,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry,
Environment environment, NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver) {
return singletonList(new AnalyticsUsage());
return singletonList(usage);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,53 +9,35 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;

import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Arrays;

/**
* Tracks usage of the Analytics aggregations.
*/
public class AnalyticsUsage {
/**
* Items to track.
*/
public enum Item {
BOXPLOT,
CUMULATIVE_CARDINALITY,
STRING_STATS,
TOP_METRICS,
T_TEST;
}

private final Map<Item, AtomicLong> trackers = new EnumMap<>(Item.class);
private final Counters counters = new Counters(
Arrays.stream(AnalyticsStatsAction.Item.values()).map(AnalyticsStatsAction.Item::statName).toArray(String[]::new));

public AnalyticsUsage() {
for (Item item: Item.values()) {
trackers.put(item, new AtomicLong(0));
}
}

/**
* Track successful parsing.
*/
public <C, T> ContextParser<C, T> track(Item item, ContextParser<C, T> realParser) {
AtomicLong usage = trackers.get(item);
public <C, T> ContextParser<C, T> track(AnalyticsStatsAction.Item item, ContextParser<C, T> realParser) {
String stat = item.statName();
return (parser, context) -> {
T value = realParser.parse(parser, context);
// Intentionally doesn't count unless the parser returns cleanly.
usage.incrementAndGet();
counters.inc(stat);
return value;
};
}

public AnalyticsStatsAction.NodeResponse stats(DiscoveryNode node) {
return new AnalyticsStatsAction.NodeResponse(node,
trackers.get(Item.BOXPLOT).get(),
trackers.get(Item.CUMULATIVE_CARDINALITY).get(),
trackers.get(Item.STRING_STATS).get(),
trackers.get(Item.TOP_METRICS).get(),
trackers.get(Item.T_TEST).get());
return new AnalyticsStatsAction.NodeResponse(node, counters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -20,26 +21,49 @@
import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureTransportAction;
import org.elasticsearch.xpack.core.analytics.AnalyticsFeatureSetUsage;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class AnalyticsUsageTransportAction extends XPackUsageFeatureTransportAction {
private final XPackLicenseState licenseState;
private final Client client;

@Inject
public AnalyticsUsageTransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
XPackLicenseState licenseState) {
XPackLicenseState licenseState, Client client) {
super(XPackUsageFeatureAction.ANALYTICS.name(), transportService, clusterService,
threadPool, actionFilters, indexNameExpressionResolver);
this.licenseState = licenseState;
this.client = client;
}

@Override
protected void masterOperation(Task task, XPackUsageRequest request, ClusterState state,
ActionListener<XPackUsageFeatureResponse> listener) {
boolean available = licenseState.isDataScienceAllowed();
if (available) {
AnalyticsStatsAction.Request statsRequest = new AnalyticsStatsAction.Request();
statsRequest.setParentTask(clusterService.localNode().getId(), task.getId());
client.execute(AnalyticsStatsAction.INSTANCE, statsRequest, ActionListener.wrap(r ->
listener.onResponse(new XPackUsageFeatureResponse(usageFeatureResponse(true, true, r))),
listener::onFailure));
} else {
AnalyticsFeatureSetUsage usage = new AnalyticsFeatureSetUsage(false, true, Collections.emptyMap());
listener.onResponse(new XPackUsageFeatureResponse(usage));
}
}

AnalyticsFeatureSetUsage usage =
new AnalyticsFeatureSetUsage(available, true);
listener.onResponse(new XPackUsageFeatureResponse(usage));
static AnalyticsFeatureSetUsage usageFeatureResponse(boolean available, boolean enabled, AnalyticsStatsAction.Response r) {
List<Counters> countersPerNode = r.getNodes()
.stream()
.map(AnalyticsStatsAction.NodeResponse::getStats)
.collect(Collectors.toList());
Counters mergedCounters = Counters.merge(countersPerNode);
return new AnalyticsFeatureSetUsage(available, enabled, mergedCounters.toNestedMap());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,54 @@
*/
package org.elasticsearch.xpack.analytics.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse;
import org.elasticsearch.xpack.core.analytics.AnalyticsFeatureSetUsage;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
import org.junit.Before;
import org.mockito.stubbing.Answer;

import java.util.Collections;

import static org.hamcrest.core.Is.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

public class AnalyticsInfoTransportActionTests extends ESTestCase {

private XPackLicenseState licenseState;
private Task task;
private ClusterService clusterService;
private ClusterName clusterName;

@Before
public void init() {
licenseState = mock(XPackLicenseState.class);
task = mock(Task.class);
when(task.getId()).thenReturn(randomLong());
clusterService = mock(ClusterService.class);
DiscoveryNode discoveryNode = mock(DiscoveryNode.class);
when(discoveryNode.getId()).thenReturn(randomAlphaOfLength(10));
when(clusterService.localNode()).thenReturn(discoveryNode);
clusterName = mock(ClusterName.class);
}

public void testAvailable() throws Exception {
Expand All @@ -35,37 +61,59 @@ public void testAvailable() throws Exception {
boolean available = randomBoolean();
when(licenseState.isDataScienceAllowed()).thenReturn(available);
assertThat(featureSet.available(), is(available));

AnalyticsUsageTransportAction usageAction = new AnalyticsUsageTransportAction(mock(TransportService.class), null, null,
mock(ActionFilters.class), null, licenseState);
Client client = mockClient();
AnalyticsUsageTransportAction usageAction = new AnalyticsUsageTransportAction(mock(TransportService.class), clusterService, null,
mock(ActionFilters.class), null, licenseState, client);
PlainActionFuture<XPackUsageFeatureResponse> future = new PlainActionFuture<>();
usageAction.masterOperation(null, null, null, future);
usageAction.masterOperation(task, null, null, future);
XPackFeatureSet.Usage usage = future.get().getUsage();
assertThat(usage.available(), is(available));

BytesStreamOutput out = new BytesStreamOutput();
usage.writeTo(out);
XPackFeatureSet.Usage serializedUsage = new AnalyticsFeatureSetUsage(out.bytes().streamInput());
assertThat(serializedUsage.available(), is(available));
if (available) {
verify(client, times(1)).execute(any(), any(), any());
}
verifyNoMoreInteractions(client);
}

public void testEnabled() throws Exception {
AnalyticsInfoTransportAction featureSet = new AnalyticsInfoTransportAction(
mock(TransportService.class), mock(ActionFilters.class), licenseState);
assertThat(featureSet.enabled(), is(true));
assertTrue(featureSet.enabled());

boolean available = randomBoolean();
when(licenseState.isDataScienceAllowed()).thenReturn(available);
Client client = mockClient();
AnalyticsUsageTransportAction usageAction = new AnalyticsUsageTransportAction(mock(TransportService.class),
null, null, mock(ActionFilters.class), null, licenseState);
clusterService, null, mock(ActionFilters.class), null, licenseState, client);
PlainActionFuture<XPackUsageFeatureResponse> future = new PlainActionFuture<>();
usageAction.masterOperation(null, null, null, future);
usageAction.masterOperation(task, null, null, future);
XPackFeatureSet.Usage usage = future.get().getUsage();
assertTrue(usage.enabled());

BytesStreamOutput out = new BytesStreamOutput();
usage.writeTo(out);
XPackFeatureSet.Usage serializedUsage = new AnalyticsFeatureSetUsage(out.bytes().streamInput());
assertTrue(serializedUsage.enabled());
if (available) {
verify(client, times(1)).execute(any(), any(), any());
}
verifyNoMoreInteractions(client);
}

private Client mockClient() {
Client client = mock(Client.class);
doAnswer((Answer<Void>) invocation -> {
@SuppressWarnings("unchecked")
ActionListener<AnalyticsStatsAction.Response> listener =
(ActionListener<AnalyticsStatsAction.Response>) invocation.getArguments()[2];
listener.onResponse(new AnalyticsStatsAction.Response(clusterName, Collections.emptyList(), Collections.emptyList()));
return null;
}).when(client).execute(eq(AnalyticsStatsAction.INSTANCE), any(), any());
return client;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.analytics.AnalyticsUsage;
import org.elasticsearch.xpack.core.analytics.AnalyticsFeatureSetUsage;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;

import java.io.IOException;
Expand All @@ -33,6 +34,7 @@
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.analytics.action.AnalyticsUsageTransportAction.usageFeatureResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -58,20 +60,18 @@ public TransportAnalyticsStatsAction action(AnalyticsUsage usage) {
}

public void test() throws IOException {
for (AnalyticsUsage.Item item : AnalyticsUsage.Item.values()) {
for (AnalyticsStatsAction.Item item : AnalyticsStatsAction.Item.values()) {
AnalyticsUsage realUsage = new AnalyticsUsage();
AnalyticsUsage emptyUsage = new AnalyticsUsage();
ContextParser<Void, Void> parser = realUsage.track(item, (p, c) -> c);
ObjectPath unused = run(realUsage, emptyUsage);
assertThat(unused.evaluate("stats.0." + item.name().toLowerCase(Locale.ROOT) + "_usage"), equalTo(0));
assertThat(unused.evaluate("stats.1." + item.name().toLowerCase(Locale.ROOT) + "_usage"), equalTo(0));
assertThat(unused.evaluate("stats." + item.name().toLowerCase(Locale.ROOT) + "_usage"), equalTo(0));
int count = between(1, 10000);
for (int i = 0; i < count; i++) {
assertNull(parser.parse(null, null));
}
ObjectPath used = run(realUsage, emptyUsage);
assertThat(item.name(), used.evaluate("stats.0." + item.name().toLowerCase(Locale.ROOT) + "_usage"), equalTo(count));
assertThat(item.name(), used.evaluate("stats.1." + item.name().toLowerCase(Locale.ROOT) + "_usage"), equalTo(0));
assertThat(item.name(), used.evaluate("stats." + item.name().toLowerCase(Locale.ROOT) + "_usage"), equalTo(count));
}
}

Expand All @@ -83,11 +83,9 @@ private ObjectPath run(AnalyticsUsage... nodeUsages) throws IOException {
AnalyticsStatsAction.Response response = new AnalyticsStatsAction.Response(
new ClusterName("cluster_name"), nodeResponses, emptyList());

AnalyticsFeatureSetUsage usage = usageFeatureResponse(true, true, response);
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();

usage.toXContent(builder, ToXContent.EMPTY_PARAMS);
return ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(usage);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

}
}
Loading

0 comments on commit b86bd57

Please sign in to comment.