-
Notifications
You must be signed in to change notification settings - Fork 25k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Normalize Pipeline Aggregation (#56399)
This aggregation will perform normalizations of metrics for a given series of data in the form of bucket values. The aggregations supports the following normalizations - rescale 0-1 - rescale 0-100 - percentage of sum - mean normalization - z-score normalization - softmax normalization To specify which normalization is to be used, it can be specified in the normalize agg's `normalizer` field. For example: ``` { "normalize": { "buckets_path": <>, "normalizer": "percent" } } ``` Closes #51005.
- Loading branch information
Showing
15 changed files
with
996 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
182 changes: 182 additions & 0 deletions
182
docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
[role="xpack"] | ||
[testenv="basic"] | ||
[[search-aggregations-pipeline-normalize-aggregation]] | ||
=== Normalize Aggregation | ||
|
||
A parent pipeline aggregation which calculates the specific normalized/rescaled value for a specific bucket value. | ||
Values that cannot be normalized, will be skipped using the <<gap-policy, skip gap policy>>. | ||
|
||
==== Syntax | ||
|
||
A `normalize` aggregation looks like this in isolation: | ||
|
||
[source,js] | ||
-------------------------------------------------- | ||
{ | ||
"normalize": { | ||
"buckets_path": "normalized", | ||
"method": "percent_of_sum" | ||
} | ||
} | ||
-------------------------------------------------- | ||
// NOTCONSOLE | ||
|
||
[[normalize_pipeline-params]] | ||
.`normalize_pipeline` Parameters | ||
[options="header"] | ||
|=== | ||
|Parameter Name |Description |Required |Default Value | ||
|`buckets_path` |The path to the buckets we wish to normalize (see <<buckets-path-syntax, `buckets_path` syntax>> for more details) |Required | | ||
|`method` | The specific <<normalize_pipeline-method, method>> to apply | Required | | ||
|`format` |format to apply to the output value of this aggregation |Optional |`null` | ||
|=== | ||
|
||
==== Methods | ||
[[normalize_pipeline-method]] | ||
|
||
The Normalize Aggregation supports multiple methods to transform the bucket values. Each method definition will use | ||
the following original set of bucket values as examples: `[5, 5, 10, 50, 10, 20]`. | ||
|
||
_rescale_0_1_:: | ||
This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized | ||
linearly in-between. | ||
|
||
x' = (x - min_x) / (max_x - min_x) | ||
|
||
[0, 0, .1111, 1, .1111, .3333] | ||
|
||
_rescale_0_100_:: | ||
This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized | ||
linearly in-between. | ||
|
||
x' = 100 * (x - min_x) / (max_x - min_x) | ||
|
||
[0, 0, 11.11, 100, 11.11, 33.33] | ||
|
||
_percent_of_sum_:: | ||
This method normalizes each value so that it represents a percentage of the total sum it attributes to. | ||
|
||
x' = x / sum_x | ||
|
||
[5%, 5%, 10%, 50%, 10%, 20%] | ||
|
||
|
||
_mean_:: | ||
This method normalizes such that each value is normalized by how much it differs from the average. | ||
|
||
x' = (x - mean_x) / (max_x - min_x) | ||
|
||
[4.63, 4.63, 9.63, 49.63, 9.63, 9.63, 19.63] | ||
|
||
_zscore_:: | ||
This method normalizes such that each value represents how far it is from the mean relative to the standard deviation | ||
|
||
x' = (x - mean_x) / stdev_x | ||
|
||
[-0.68, -0.68, -0.39, 1.94, -0.39, 0.19] | ||
|
||
_softmax_:: | ||
This method normalizes such that each value is exponentiated and relative to the sum of the exponents of the original values. | ||
|
||
x' = e^x / sum_e_x | ||
|
||
[2.862E-20, 2.862E-20, 4.248E-18, 0.999, 9.357E-14, 4.248E-18] | ||
|
||
|
||
==== Example | ||
|
||
The following snippet calculates the percent of total sales for each month: | ||
|
||
[source,console] | ||
-------------------------------------------------- | ||
POST /sales/_search | ||
{ | ||
"size": 0, | ||
"aggs" : { | ||
"sales_per_month" : { | ||
"date_histogram" : { | ||
"field" : "date", | ||
"calendar_interval" : "month" | ||
}, | ||
"aggs": { | ||
"sales": { | ||
"sum": { | ||
"field": "price" | ||
} | ||
}, | ||
"percent_of_total_sales": { | ||
"normalize": { | ||
"buckets_path": "sales", <1> | ||
"method": "percent_of_sum", <2> | ||
"format": "00.00%" <3> | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
-------------------------------------------------- | ||
// TEST[setup:sales] | ||
|
||
<1> `buckets_path` instructs this normalize aggregation to use the output of the `sales` aggregation for rescaling | ||
<2> `method` sets which rescaling to apply. In this case, `percent_of_sum` will calculate the sales value as a percent of all sales | ||
in the parent bucket | ||
<3> `format` influences how to format the metric as a string using Java's `DecimalFormat` pattern. In this case, multiplying by 100 | ||
and adding a '%' | ||
|
||
And the following may be the response: | ||
|
||
[source,console-result] | ||
-------------------------------------------------- | ||
{ | ||
"took": 11, | ||
"timed_out": false, | ||
"_shards": ..., | ||
"hits": ..., | ||
"aggregations": { | ||
"sales_per_month": { | ||
"buckets": [ | ||
{ | ||
"key_as_string": "2015/01/01 00:00:00", | ||
"key": 1420070400000, | ||
"doc_count": 3, | ||
"sales": { | ||
"value": 550.0 | ||
}, | ||
"percent_of_total_sales": { | ||
"value": 0.5583756345177665, | ||
"value_as_string": "55.84%" | ||
} | ||
}, | ||
{ | ||
"key_as_string": "2015/02/01 00:00:00", | ||
"key": 1422748800000, | ||
"doc_count": 2, | ||
"sales": { | ||
"value": 60.0 | ||
}, | ||
"percent_of_total_sales": { | ||
"value": 0.06091370558375635, | ||
"value_as_string": "06.09%" | ||
} | ||
}, | ||
{ | ||
"key_as_string": "2015/03/01 00:00:00", | ||
"key": 1425168000000, | ||
"doc_count": 2, | ||
"sales": { | ||
"value": 375.0 | ||
}, | ||
"percent_of_total_sales": { | ||
"value": 0.38071065989847713, | ||
"value_as_string": "38.07%" | ||
} | ||
} | ||
] | ||
} | ||
} | ||
} | ||
-------------------------------------------------- | ||
// TESTRESPONSE[s/"took": 11/"took": $body.took/] | ||
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/] | ||
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
146 changes: 146 additions & 0 deletions
146
...java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.analytics.normalize; | ||
|
||
import org.elasticsearch.common.ParseField; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.xcontent.ConstructingObjectParser; | ||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
import org.elasticsearch.search.DocValueFormat; | ||
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; | ||
import org.elasticsearch.search.aggregations.pipeline.BucketMetricsParser; | ||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.function.DoubleUnaryOperator; | ||
import java.util.function.Function; | ||
|
||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; | ||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; | ||
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT; | ||
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Mean; | ||
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Percent; | ||
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.RescaleZeroToOne; | ||
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.RescaleZeroToOneHundred; | ||
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Softmax; | ||
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.ZScore; | ||
|
||
public class NormalizePipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<NormalizePipelineAggregationBuilder> { | ||
public static final String NAME = "normalize"; | ||
static final ParseField METHOD_FIELD = new ParseField("method"); | ||
|
||
@SuppressWarnings("unchecked") | ||
public static final ConstructingObjectParser<NormalizePipelineAggregationBuilder, String> PARSER = new ConstructingObjectParser<>( | ||
NAME, false, (args, name) -> new NormalizePipelineAggregationBuilder(name, (String) args[0], | ||
(String) args[1], (List<String>) args[2])); | ||
|
||
static { | ||
PARSER.declareString(optionalConstructorArg(), FORMAT); | ||
PARSER.declareString(constructorArg(), METHOD_FIELD); | ||
PARSER.declareStringArray(constructorArg(), BUCKETS_PATH_FIELD); | ||
} | ||
|
||
static final Map<String, Function<double[], DoubleUnaryOperator>> NAME_MAP = Map.of( | ||
RescaleZeroToOne.NAME, RescaleZeroToOne::new, | ||
RescaleZeroToOneHundred.NAME, RescaleZeroToOneHundred::new, | ||
Mean.NAME, Mean::new, | ||
ZScore.NAME, ZScore::new, | ||
Percent.NAME, Percent::new, | ||
Softmax.NAME, Softmax::new | ||
); | ||
|
||
static String validateMethodName(String name) { | ||
if (NAME_MAP.containsKey(name)) { | ||
return name; | ||
} | ||
throw new IllegalArgumentException("invalid method [" + name + "]"); | ||
} | ||
|
||
private final String format; | ||
private final String method; | ||
|
||
public NormalizePipelineAggregationBuilder(String name, String format, String method, List<String> bucketsPath) { | ||
super(name, NAME, bucketsPath.toArray(new String[0])); | ||
this.format = format; | ||
this.method = validateMethodName(method); | ||
} | ||
|
||
/** | ||
* Read from a stream. | ||
*/ | ||
public NormalizePipelineAggregationBuilder(StreamInput in) throws IOException { | ||
super(in, NAME); | ||
format = in.readOptionalString(); | ||
method = in.readString(); | ||
} | ||
|
||
@Override | ||
protected final void doWriteTo(StreamOutput out) throws IOException { | ||
out.writeOptionalString(format); | ||
out.writeString(method); | ||
} | ||
|
||
/** | ||
* Gets the format to use on the output of this aggregation. | ||
*/ | ||
public String format() { | ||
return format; | ||
} | ||
|
||
protected DocValueFormat formatter() { | ||
if (format != null) { | ||
return new DocValueFormat.Decimal(format); | ||
} else { | ||
return DocValueFormat.RAW; | ||
} | ||
} | ||
|
||
@Override | ||
protected PipelineAggregator createInternal(Map<String, Object> metadata) { | ||
return new NormalizePipelineAggregator(name, bucketsPaths, formatter(), NAME_MAP.get(method), metadata); | ||
} | ||
|
||
@Override | ||
protected void validate(ValidationContext context) { | ||
if (bucketsPaths.length != 1) { | ||
context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]"); | ||
} | ||
context.validateHasParent(NAME, name); | ||
} | ||
|
||
@Override | ||
protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { | ||
if (format != null) { | ||
builder.field(BucketMetricsParser.FORMAT.getPreferredName(), format); | ||
} | ||
builder.field(METHOD_FIELD.getPreferredName(), method); | ||
return builder; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(super.hashCode(), format, method); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object obj) { | ||
if (this == obj) return true; | ||
if (obj == null || getClass() != obj.getClass()) return false; | ||
if (super.equals(obj) == false) return false; | ||
NormalizePipelineAggregationBuilder other = (NormalizePipelineAggregationBuilder) obj; | ||
return Objects.equals(format, other.format) && Objects.equals(method, other.method); | ||
} | ||
|
||
@Override | ||
public String getWriteableName() { | ||
return NAME; | ||
} | ||
} |
Oops, something went wrong.