Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add moving percentiles pipeline aggregation #55441

Merged
merged 9 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
[role="xpack"]
[testenv="basic"]
[[search-aggregations-pipeline-moving-percentiles-aggregation]]
=== Moving Percentiles Aggregation

Given an ordered series of <<search-aggregations-metrics-percentile-aggregation, percentiles>>, the Moving Percentile aggregation
will slide a window across those percentiles and allow the user to compute the cumulative percentile.

This is conceptually very similar to the <<search-aggregations-pipeline-movfn-aggregation, Moving Function>> pipeline aggregation,
except it works on the percentiles sketches instead of the actual buckets values.

==== Syntax

A `moving_percentiles` aggregation looks like this in isolation:

[source,js]
--------------------------------------------------
{
"moving_percentiles": {
"buckets_path": "the_percentile",
"window": 10
}
}
--------------------------------------------------
// NOTCONSOLE

[[moving-percentiles-params]]
.`moving_percentiles` Parameters
[options="header"]
|===
|Parameter Name |Description |Required |Default Value
|`buckets_path` |Path to the percentile of interest (see <<buckets-path-syntax, `buckets_path` Syntax>> for more details |Required |
|`window` |The size of window to "slide" across the histogram. |Required |
|`shift` |<<shift-parameter, Shift>> of window position. |Optional | 0
|===

`moving_percentiles` aggregations must be embedded inside of a `histogram` or `date_histogram` aggregation. They can be
embedded like any other metric aggregation:

[source,console]
--------------------------------------------------
POST /_search
{
"size": 0,
"aggs": {
"my_date_histo":{ <1>
"date_histogram":{
"field":"date",
"calendar_interval":"1M"
},
"aggs":{
"the_percentile":{ <2>
"percentiles":{
"field": "price",
"percents": [ 1.0, 99.0 ]
}
},
"the_movperc": {
"moving_percentiles": {
"buckets_path": "the_percentile", <3>
"window": 10
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]

<1> A `date_histogram` named "my_date_histo" is constructed on the "timestamp" field, with one-day intervals
<2> A `percentile` metric is used to calculate the percentiles of a field.
<3> Finally, we specify a `moving_percentiles` aggregation which uses "the_percentile" sketch as its input.

Moving percentiles are built by first specifying a `histogram` or `date_histogram` over a field. You then add
a percentile metric inside of that histogram. Finally, the `moving_percentiles` is embedded inside the histogram.
The `buckets_path` parameter is then used to "point" at the percentiles aggregation inside of the histogram (see
<<buckets-path-syntax>> for a description of the syntax for `buckets_path`).

And the following may be the response:

[source,console-result]
--------------------------------------------------
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"my_date_histo": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"the_percentile": {
"values": {
"1.0": 150.0,
"99.0": 200.0
}
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"the_percentile": {
"values": {
"1.0": 10.0,
"99.0": 50.0
}
},
"the_movperc": {
"values": {
"1.0": 150.0,
"99.0": 200.0
}
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"the_percentile": {
"values": {
"1.0": 175.0,
"99.0": 200.0
}
},
"the_movperc": {
"values": {
"1.0": 10.0,
"99.0": 200.0
}
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]

The output format of the `moving_percentiles` aggregation is inherited from the format of the referenced
<<search-aggregations-metrics-percentile-aggregation,`percentiles`>> aggregation.

[[shift-parameter]]
==== shift parameter

By default (with `shift = 0`), the window that is offered for calculation is the last `n` values excluding the current bucket.
Increasing `shift` by 1 moves starting window position by `1` to the right.

- To include current bucket to the window, use `shift = 1`.
- For center alignment (`n / 2` values before and after the current bucket), use `shift = window / 2`.
- For right alignment (`n` values after the current bucket), use `shift = window`.

If either of window edges moves outside the borders of data series, the window shrinks to include available values only.
3 changes: 2 additions & 1 deletion docs/reference/rest-api/usage.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ GET /_xpack/usage
"top_metrics_usage" : 0,
"cumulative_cardinality_usage" : 0,
"t_test_usage" : 0,
"string_stats_usage" : 0
"string_stats_usage" : 0,
"moving_percentiles_usage" : 0
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public double value(String name) {
return value(Double.parseDouble(name));
}

DocValueFormat formatter() {
public DocValueFormat formatter() {
return format;
}

Expand All @@ -96,10 +96,18 @@ public long getEstimatedMemoryFootprint() {
return state.getEstimatedFootprintInBytes();
}

DoubleHistogram getState() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you kindly add javadoc to these while you are here?

public DoubleHistogram getState() {
return state;
}

public double[] getKeys() {
return keys;
}

public boolean keyed() {
return keyed;
}

@Override
public AbstractInternalHDRPercentiles reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
DoubleHistogram merged = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,26 @@ public double value(String name) {

public abstract double value(double key);

DocValueFormat formatter() {
public DocValueFormat formatter() {
return format;
}

public long getEstimatedMemoryFootprint() {
return state.byteSize();
}

TDigestState getState() {
public TDigestState getState() {
return state;
}

public double[] getKeys() {
return keys;
}

public boolean keyed() {
return keyed;
}

@Override
public AbstractInternalTDigestPercentiles reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
TDigestState merged = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper;
import org.elasticsearch.xpack.analytics.movingPercentiles.MovingPercentilesPipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.stringstats.InternalStringStats;
import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder;
import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics;
Expand All @@ -52,6 +53,7 @@
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -71,13 +73,18 @@ public AnalyticsPlugin() { }

@Override
public List<PipelineAggregationSpec> getPipelineAggregations() {
return singletonList(
new PipelineAggregationSpec(
CumulativeCardinalityPipelineAggregationBuilder.NAME,
CumulativeCardinalityPipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.CUMULATIVE_CARDINALITY,
checkLicense(CumulativeCardinalityPipelineAggregationBuilder.PARSER)))
);
List<PipelineAggregationSpec> pipelineAggs = new ArrayList<>();
pipelineAggs.add(new PipelineAggregationSpec(
CumulativeCardinalityPipelineAggregationBuilder.NAME,
CumulativeCardinalityPipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.CUMULATIVE_CARDINALITY,
checkLicense(CumulativeCardinalityPipelineAggregationBuilder.PARSER))));
pipelineAggs.add(new PipelineAggregationSpec(
MovingPercentilesPipelineAggregationBuilder.NAME,
MovingPercentilesPipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.MOVING_PERCENTILES,
checkLicense(MovingPercentilesPipelineAggregationBuilder.PARSER))));
return pipelineAggs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.movingPercentiles;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;

public class MovingPercentilesPipelineAggregationBuilder
extends AbstractPipelineAggregationBuilder<MovingPercentilesPipelineAggregationBuilder> {
public static final String NAME = "moving_percentiles";
private static final ParseField WINDOW = new ParseField("window");
private static final ParseField SHIFT = new ParseField("shift");

public static final ConstructingObjectParser<MovingPercentilesPipelineAggregationBuilder, String> PARSER =
new ConstructingObjectParser<>(NAME, false, (args, name) -> {
return new MovingPercentilesPipelineAggregationBuilder(name, (String) args[0], (int) args[1]);
});
static {
PARSER.declareString(constructorArg(), BUCKETS_PATH_FIELD);
PARSER.declareInt(constructorArg(), WINDOW);
PARSER.declareInt(MovingPercentilesPipelineAggregationBuilder::setShift, SHIFT);
}

private final int window;
private int shift;

public MovingPercentilesPipelineAggregationBuilder(String name, String bucketsPath, int window) {
super(name, NAME, new String[] { bucketsPath });
if (window <= 0) {
throw new IllegalArgumentException("[" + WINDOW.getPreferredName() + "] must be a positive, non-zero integer.");
}
this.window = window;
}

/**
* Read from a stream.
*/
public MovingPercentilesPipelineAggregationBuilder(StreamInput in) throws IOException {
super(in, NAME);
window = in.readVInt();
shift = in.readInt();
}

@Override
protected final void doWriteTo(StreamOutput out) throws IOException {
out.writeVInt(window);
out.writeInt(shift);
}

/**
* Returns the window size for this aggregation
*/
public int getWindow() {
return window;
}

/**
* Returns the shift for this aggregation
*/
public int getShift() {
return shift;
}

/**
* Sets the shift for this aggregation
*/
public void setShift(int shift) {
this.shift = shift;
}

@Override
protected PipelineAggregator createInternal(Map<String, Object> metaData) {
return new MovingPercentilesPipelineAggregator(name, bucketsPaths, getWindow(), getShift(), metaData);
}

@Override
protected void validate(ValidationContext context) {
if (bucketsPaths.length != 1) {
context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]");
}
context.validateParentAggSequentiallyOrdered(NAME, name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if @nik9000 has ideas how we could validate that the pipeline targets a percentiles agg up front in the builder, instead of down below during runtime? Might not be possible yet...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be worth adding something near bucketCardinality that returns the type of buckets. That'd let us do this ordered assertion without instanceofs. I'd be happy to take a stab at it in a follow up because I have ideas. No need to delay this PR for it though.

}

@Override
protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(BUCKETS_PATH_FIELD.getPreferredName(), bucketsPaths[0]);
builder.field(WINDOW.getPreferredName(), window);
builder.field(SHIFT.getPreferredName(), shift);
return builder;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), window, shift);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
MovingPercentilesPipelineAggregationBuilder other = (MovingPercentilesPipelineAggregationBuilder) obj;
return window == other.window && shift == other.shift;
}

@Override
public String getWriteableName() {
return NAME;
}

@Override
protected boolean overrideBucketsPath() {
return true;
}
}
Loading