-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add moving percentiles pipeline aggregation #55441
Changes from 4 commits
c8131a6
8a0db42
8a6992c
37eb537
b817d41
b3c314b
099d621
17fdd02
6537358
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
[role="xpack"] | ||
[testenv="basic"] | ||
[[search-aggregations-pipeline-moving-percentiles-aggregation]] | ||
=== Moving Percentiles Aggregation | ||
|
||
Given an ordered series of <<search-aggregations-metrics-percentile-aggregation, percentiles>>, the Moving Percentile aggregation | ||
will slide a window across those percentiles and allow the user to compute the cumulative percentile. | ||
|
||
This is conceptually very similar to the <<search-aggregations-pipeline-movfn-aggregation, Moving Function>> pipeline aggregation, | ||
except it works on the percentiles sketches instead of the actual buckets values. | ||
|
||
==== Syntax | ||
|
||
A `moving_percentiles` aggregation looks like this in isolation: | ||
|
||
[source,js] | ||
-------------------------------------------------- | ||
{ | ||
"moving_percentiles": { | ||
"buckets_path": "the_percentile", | ||
"window": 10 | ||
} | ||
} | ||
-------------------------------------------------- | ||
// NOTCONSOLE | ||
|
||
[[moving-percentiles-params]] | ||
.`moving_percentiles` Parameters | ||
[options="header"] | ||
|=== | ||
|Parameter Name |Description |Required |Default Value | ||
|`buckets_path` |Path to the percentile of interest (see <<buckets-path-syntax, `buckets_path` Syntax>> for more details |Required | | ||
|`window` |The size of window to "slide" across the histogram. |Required | | ||
|`shift` |<<shift-parameter, Shift>> of window position. |Optional | 0 | ||
|=== | ||
|
||
`moving_percentiles` aggregations must be embedded inside of a `histogram` or `date_histogram` aggregation. They can be | ||
embedded like any other metric aggregation: | ||
|
||
[source,console] | ||
-------------------------------------------------- | ||
POST /_search | ||
{ | ||
"size": 0, | ||
"aggs": { | ||
"my_date_histo":{ <1> | ||
"date_histogram":{ | ||
"field":"date", | ||
"calendar_interval":"1M" | ||
}, | ||
"aggs":{ | ||
"the_percentile":{ <2> | ||
"percentiles":{ | ||
"field": "price", | ||
"percents": [ 1.0, 99.0 ] | ||
} | ||
}, | ||
"the_movperc": { | ||
"moving_percentiles": { | ||
"buckets_path": "the_percentile", <3> | ||
"window": 10 | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
-------------------------------------------------- | ||
// TEST[setup:sales] | ||
|
||
<1> A `date_histogram` named "my_date_histo" is constructed on the "timestamp" field, with one-day intervals | ||
<2> A `percentile` metric is used to calculate the percentiles of a field. | ||
<3> Finally, we specify a `moving_percentiles` aggregation which uses "the_percentile" sketch as its input. | ||
|
||
Moving percentiles are built by first specifying a `histogram` or `date_histogram` over a field. You then add | ||
a percentile metric inside of that histogram. Finally, the `moving_percentiles` is embedded inside the histogram. | ||
The `buckets_path` parameter is then used to "point" at the percentiles aggregation inside of the histogram (see | ||
<<buckets-path-syntax>> for a description of the syntax for `buckets_path`). | ||
|
||
And the following may be the response: | ||
|
||
[source,console-result] | ||
-------------------------------------------------- | ||
{ | ||
"took": 11, | ||
"timed_out": false, | ||
"_shards": ..., | ||
"hits": ..., | ||
"aggregations": { | ||
"my_date_histo": { | ||
"buckets": [ | ||
{ | ||
"key_as_string": "2015/01/01 00:00:00", | ||
"key": 1420070400000, | ||
"doc_count": 3, | ||
"the_percentile": { | ||
"values": { | ||
"1.0": 150.0, | ||
"99.0": 200.0 | ||
} | ||
} | ||
}, | ||
{ | ||
"key_as_string": "2015/02/01 00:00:00", | ||
"key": 1422748800000, | ||
"doc_count": 2, | ||
"the_percentile": { | ||
"values": { | ||
"1.0": 10.0, | ||
"99.0": 50.0 | ||
} | ||
}, | ||
"the_movperc": { | ||
"values": { | ||
"1.0": 150.0, | ||
"99.0": 200.0 | ||
} | ||
} | ||
}, | ||
{ | ||
"key_as_string": "2015/03/01 00:00:00", | ||
"key": 1425168000000, | ||
"doc_count": 2, | ||
"the_percentile": { | ||
"values": { | ||
"1.0": 175.0, | ||
"99.0": 200.0 | ||
} | ||
}, | ||
"the_movperc": { | ||
"values": { | ||
"1.0": 10.0, | ||
"99.0": 200.0 | ||
} | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
-------------------------------------------------- | ||
// TESTRESPONSE[s/"took": 11/"took": $body.took/] | ||
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/] | ||
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/] | ||
|
||
The output format of the `moving_percentiles` aggregation is inherited from the format of the referenced | ||
<<search-aggregations-metrics-percentile-aggregation,`percentiles`>> aggregation. | ||
|
||
[[shift-parameter]] | ||
==== shift parameter | ||
|
||
By default (with `shift = 0`), the window that is offered for calculation is the last `n` values excluding the current bucket. | ||
Increasing `shift` by 1 moves starting window position by `1` to the right. | ||
|
||
- To include current bucket to the window, use `shift = 1`. | ||
- For center alignment (`n / 2` values before and after the current bucket), use `shift = window / 2`. | ||
- For right alignment (`n` values after the current bucket), use `shift = window`. | ||
|
||
If either of window edges moves outside the borders of data series, the window shrinks to include available values only. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.xpack.analytics.movingPercentiles; | ||
|
||
import org.elasticsearch.common.ParseField; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.xcontent.ConstructingObjectParser; | ||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; | ||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; | ||
|
||
import java.io.IOException; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; | ||
|
||
public class MovingPercentilesPipelineAggregationBuilder | ||
extends AbstractPipelineAggregationBuilder<MovingPercentilesPipelineAggregationBuilder> { | ||
public static final String NAME = "moving_percentiles"; | ||
private static final ParseField WINDOW = new ParseField("window"); | ||
private static final ParseField SHIFT = new ParseField("shift"); | ||
|
||
public static final ConstructingObjectParser<MovingPercentilesPipelineAggregationBuilder, String> PARSER = | ||
new ConstructingObjectParser<>(NAME, false, (args, name) -> { | ||
return new MovingPercentilesPipelineAggregationBuilder(name, (String) args[0], (int) args[1]); | ||
}); | ||
static { | ||
PARSER.declareString(constructorArg(), BUCKETS_PATH_FIELD); | ||
PARSER.declareInt(constructorArg(), WINDOW); | ||
PARSER.declareInt(MovingPercentilesPipelineAggregationBuilder::setShift, SHIFT); | ||
} | ||
|
||
private final int window; | ||
private int shift; | ||
|
||
public MovingPercentilesPipelineAggregationBuilder(String name, String bucketsPath, int window) { | ||
super(name, NAME, new String[] { bucketsPath }); | ||
if (window <= 0) { | ||
throw new IllegalArgumentException("[" + WINDOW.getPreferredName() + "] must be a positive, non-zero integer."); | ||
} | ||
this.window = window; | ||
} | ||
|
||
/** | ||
* Read from a stream. | ||
*/ | ||
public MovingPercentilesPipelineAggregationBuilder(StreamInput in) throws IOException { | ||
super(in, NAME); | ||
window = in.readVInt(); | ||
shift = in.readInt(); | ||
} | ||
|
||
@Override | ||
protected final void doWriteTo(StreamOutput out) throws IOException { | ||
out.writeVInt(window); | ||
out.writeInt(shift); | ||
} | ||
|
||
/** | ||
* Returns the window size for this aggregation | ||
*/ | ||
public int getWindow() { | ||
return window; | ||
} | ||
|
||
/** | ||
* Returns the shift for this aggregation | ||
*/ | ||
public int getShift() { | ||
return shift; | ||
} | ||
|
||
/** | ||
* Sets the shift for this aggregation | ||
*/ | ||
public void setShift(int shift) { | ||
this.shift = shift; | ||
} | ||
|
||
@Override | ||
protected PipelineAggregator createInternal(Map<String, Object> metaData) { | ||
return new MovingPercentilesPipelineAggregator(name, bucketsPaths, getWindow(), getShift(), metaData); | ||
} | ||
|
||
@Override | ||
protected void validate(ValidationContext context) { | ||
if (bucketsPaths.length != 1) { | ||
context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]"); | ||
} | ||
context.validateParentAggSequentiallyOrdered(NAME, name); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if @nik9000 has ideas how we could validate that the pipeline targets a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it may be worth adding something near |
||
} | ||
|
||
@Override | ||
protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { | ||
builder.field(BUCKETS_PATH_FIELD.getPreferredName(), bucketsPaths[0]); | ||
builder.field(WINDOW.getPreferredName(), window); | ||
builder.field(SHIFT.getPreferredName(), shift); | ||
return builder; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(super.hashCode(), window, shift); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object obj) { | ||
if (this == obj) return true; | ||
if (obj == null || getClass() != obj.getClass()) return false; | ||
if (super.equals(obj) == false) return false; | ||
MovingPercentilesPipelineAggregationBuilder other = (MovingPercentilesPipelineAggregationBuilder) obj; | ||
return window == other.window && shift == other.shift; | ||
} | ||
|
||
@Override | ||
public String getWriteableName() { | ||
return NAME; | ||
} | ||
|
||
@Override | ||
protected boolean overrideBucketsPath() { | ||
return true; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you kindly add javadoc to these while you are here?