Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Variable Width Histogram Aggregation #42035

Merged
merged 33 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a653632
Create a new aggregation to dynamically calculate numerical histogram…
jamesdorfman May 8, 2019
f18127f
Resolve merge conflicts
jamesdorfman May 22, 2020
622ae94
Remove unused parse method
jamesdorfman May 23, 2020
81abb74
Implement binary search for BigArrays
jamesdorfman May 23, 2020
952e468
Make the collector private
jamesdorfman May 23, 2020
e1cabc9
Fix formatting
jamesdorfman May 23, 2020
62102bc
Fix formatting
jamesdorfman May 23, 2020
5f3ceda
Fix formatting
jamesdorfman May 23, 2020
c4d07af
Add asciidoc test
jamesdorfman May 28, 2020
52fa59c
Move BinarySearcher to a dedicated class and improve its tests
jamesdorfman May 28, 2020
989302f
Fix a documentation formatting issue
jamesdorfman May 28, 2020
c114964
Add a test for BinarySearcher that compares results against Arrays.bi…
jamesdorfman May 28, 2020
25a00f3
Add test for backwards compatibility
jamesdorfman May 28, 2020
2cf8966
Enable randomized tests for InternalVariableWidthHistogram and fix va…
jamesdorfman May 29, 2020
1e98358
Add documentation to BinarySearcher
jamesdorfman May 29, 2020
60f95ed
Remove contents of assertReduced() and rely on manual tests for now. …
jamesdorfman May 29, 2020
4b734e8
Add randomized tests to VariableWidthHistogramAggregator and fix bugs…
jamesdorfman May 30, 2020
e6938a7
Remove some logging
jamesdorfman May 30, 2020
5ba9dc9
Force some tests to use a single segment & add a new multi-segment te…
jamesdorfman Jun 3, 2020
3470ff2
Rename parameter cacheLimit to initialBuffer
jamesdorfman Jun 9, 2020
77faec9
Reduce all buckets at least once, so that their sub aggregations can …
jamesdorfman Jun 10, 2020
94768e9
Make centroid the bucket key & add min and max to XContent
jamesdorfman Jun 11, 2020
9fdb29f
Improve the bucket bounds adjustment when there is an overlap between…
jamesdorfman Jun 11, 2020
78618ca
Add more tests for bucket maxes
jamesdorfman Jun 12, 2020
60e2ab0
Fix failing rest-api-spec test to reflect that keys are now centroids…
jamesdorfman Jun 12, 2020
201bba0
Add bucket mins and maxes to the expected query response in the doc
jamesdorfman Jun 12, 2020
1f5c049
Merge remote-tracking branch 'upstream/master' into auto_cluster_hist…
jamesdorfman Jun 15, 2020
548190e
Remove obsolete methods after merging: consumeBucketsAndMaybeBreak() …
jamesdorfman Jun 16, 2020
440e202
Fix checkstyle errors
jamesdorfman Jun 17, 2020
a590503
Add variable_width_histogram to unsupported list for transform
jamesdorfman Jun 17, 2020
a87ddcb
Add license to VariableWidthHistogramAggregatorSupplier.java
jamesdorfman Jun 18, 2020
40bff4b
Lower the max number of buckets created in InternalVariableWidthHisto…
jamesdorfman Jun 19, 2020
1a27e07
Merge remote-tracking branch 'upstream/master' into auto_cluster_hist…
jamesdorfman Jun 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedAutoDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedVariableWidthHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.VariableWidthHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.missing.ParsedMissing;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
Expand Down Expand Up @@ -1929,6 +1931,8 @@ static List<NamedXContentRegistry.Entry> getDefaultNamedXContents() {
map.put(HistogramAggregationBuilder.NAME, (p, c) -> ParsedHistogram.fromXContent(p, (String) c));
map.put(DateHistogramAggregationBuilder.NAME, (p, c) -> ParsedDateHistogram.fromXContent(p, (String) c));
map.put(AutoDateHistogramAggregationBuilder.NAME, (p, c) -> ParsedAutoDateHistogram.fromXContent(p, (String) c));
map.put(VariableWidthHistogramAggregationBuilder.NAME,
(p, c) -> ParsedVariableWidthHistogram.fromXContent(p, (String) c));
map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c));
map.put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c));
map.put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
[[search-aggregations-bucket-variablewidthhistogram-aggregation]]
=== Variable Width Histogram Aggregation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I talked with @polyfractal and @giladgal about this PR and we'd like to mark it as experimental[] so we can make breaking changes to its API. It is different enough from what we've done in the past that we want a little time not subject to semver.

@jamesdorfman, there is nothing you have to do for this, but you'll see an issue tracking the experimental-nature of the agg pretty soon.

This is a multi-bucket aggregation similar to <<search-aggregations-bucket-histogram-aggregation>>.
However, the width of each bucket is not specified. Rather, a target number of buckets is provided and bucket intervals
are dynamically determined based on the document distribution. This is done using a simple one-pass document clustering algorithm
that aims to obtain low distances between bucket centroids. Unlike other multi-bucket aggregations, the intervals will not
necessarily have a uniform width.

TIP: The number of buckets returned will always be less than or equal to the target number.

Requesting a target of 2 buckets.

[source,console]
--------------------------------------------------
POST /sales/_search?size=0
{
"aggs" : {
"prices" : {
"variable_width_histogram" : {
"field" : "price",
"buckets" : 2
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]

Response:

[source,console-result]
--------------------------------------------------
{
...
"aggregations": {
"prices" : {
"buckets": [
{
"min": 10.0,
"key": 30.0,
"max": 50.0,
"doc_count": 2
},
{
"min": 150.0,
"key": 185.0,
"max": 200.0,
"doc_count": 5
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]

==== Clustering Algorithm
Each shard fetches the first `initial_buffer` documents and stores them in memory. Once the buffer is full, these documents
are sorted and linearly separated into `3/4 * shard_size buckets`.
Next each remaining documents is either collected into the nearest bucket, or placed into a new bucket if it is distant
from all the existing ones. At most `shard_size` total buckets are created.

In the reduce step, the coordinating node sorts the buckets from all shards by their centroids. Then, the two buckets
with the nearest centroids are repeatedly merged until the target number of buckets is achieved.
This merging procedure is a form of https://en.wikipedia.org/wiki/Hierarchical_clustering[agglomerative hierarchical clustering].

TIP: A shard can return fewer than `shard_size` buckets, but it cannot return more.

==== Shard size
The `shard_size` parameter specifies the number of buckets that the coordinating node will request from each shard.
A higher `shard_size` leads each shard to produce smaller buckets. This reduce the likelihood of buckets overlapping
after the reduction step. Increasing the `shard_size` will improve the accuracy of the histogram, but it will
also make it more expensive to compute the final result because bigger priority queues will have to be managed on a
shard level, and the data transfers between the nodes and the client will be larger.

TIP: Parameters `buckets`, `shard_size`, and `initial_buffer` are optional. By default, `buckets = 10`, `shard_size = 500` and `initial_buffer = min(50 * shard_size, 50000)`.

==== Initial Buffer
The `initial_buffer` parameter can be used to specify the number of individual documents that will be stored in memory
on a shard before the initial bucketing algorithm is run. Bucket distribution is determined using this sample
of `initial_buffer` documents. So, although a higher `initial_buffer` will use more memory, it will lead to more representative
clusters.

==== Bucket bounds are approximate
During the reduce step, the master node continuously merges the two buckets with the nearest centroids. If two buckets have
overlapping bounds but distant centroids, then it is possible that they will not be merged. Because of this, after
reduction the maximum value in some interval (`max`) might be greater than the minimum value in the subsequent
bucket (`min`). To reduce the impact of this error, when such an overlap occurs the bound between these intervals is adjusted to be `(max + min) / 2`.

TIP: Bucket bounds are very sensitive to outliers
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
setup:
- do:
indices.create:
index: test
body:
settings:
number_of_replicas: 0
mappings:
properties:
number:
type: integer
- do:
bulk:
refresh: true
index: test
body:
- '{"index": {}}'
- '{"number": -3}'
- '{"index": {}}'
- '{"number": -2}'
- '{"index": {}}'
- '{"number": 1}'
- '{"index": {}}'
- '{"number": 4}'
- '{"index": {}}'
- '{"number": 5}'

---
"basic":
- skip:
version: " - 7.99.99"
reason: added in 8.0.0 (to be backported to 7.9.0)
- do:
search:
body:
size: 0
aggs:
histo:
variable_width_histogram:
field: number
buckets: 3
- match: { hits.total.value: 5 }
- length: { aggregations.histo.buckets: 3 }
- match: { aggregations.histo.buckets.0.key: -2.5 }
- match: { aggregations.histo.buckets.0.doc_count: 2 }
- match: { aggregations.histo.buckets.1.key: 1.0 }
- match: { aggregations.histo.buckets.1.doc_count: 1 }
- match: { aggregations.histo.buckets.2.key: 4.5 }
- match: { aggregations.histo.buckets.2.doc_count: 2 }

30 changes: 30 additions & 0 deletions server/src/main/java/org/elasticsearch/common/util/BigArrays.java
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,35 @@ public DoubleArray grow(DoubleArray array, long minSize) {
return resize(array, newSize);
}

public static class DoubleBinarySearcher extends BinarySearcher{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its probably worth a note that this class isn't thread safe. Just to cover all your bases.


DoubleArray array;
double searchFor;

public DoubleBinarySearcher(DoubleArray array){
this.array = array;
this.searchFor = Integer.MIN_VALUE;
}

@Override
protected int compare(int index) {
// Prevent use of BinarySearcher.search() and force the use of DoubleBinarySearcher.search()
assert this.searchFor != Integer.MIN_VALUE;

return Double.compare(array.get(index), searchFor);
}

@Override
protected double distance(int index) {
return Math.abs(array.get(index) - searchFor);
}

public int search(int from, int to, double searchFor) {
this.searchFor = searchFor;
return super.search(from, to);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe search should be protected if it only makes sense to call it from a subclass? Then you wouldn't need the assert.

}
}

/**
* Allocate a new {@link FloatArray}.
* @param size the initial length of the array
Expand Down Expand Up @@ -782,3 +811,4 @@ public <T> ObjectArray<T> grow(ObjectArray<T> array, long minSize) {
return resize(array, newSize);
}
}

117 changes: 117 additions & 0 deletions server/src/main/java/org/elasticsearch/common/util/BinarySearcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.util;

/**
* Performs binary search on an arbitrary data structure.
*
* To do a search, create a subclass and implement custom {@link #compare(int)} and {@link #distance(int)} methods.
*
* {@link BinarySearcher} knows nothing about the value being searched for or the underlying data structure.
* These things should be determined by the subclass in its overridden methods.
*
* Refer to {@link BigArrays.DoubleBinarySearcher} for an example.
*
* NOTE: this class is not thread safe
*/
abstract public class BinarySearcher{

/**
* @return a negative integer, zero, or a positive integer if the array's value at <code>index</code> is less than,
* equal to, or greater than the value being searched for.
*/
abstract protected int compare(int index);

/**
* @return the magnitude of the distance between the element at <code>index</code> and the value being searched for.
* It will usually be <code>Math.abs(array[index] - searchValue)</code>.
*/
abstract protected double distance(int index);

/**
* @return the index who's underlying value is closest to the value being searched for.
*/
private int getClosestIndex(int index1, int index2){
if(distance(index1) < distance(index2)){
return index1;
} else {
return index2;
}
}

/**
* Uses a binary search to determine the index of the element within the index range {from, ... , to} that is
* closest to the search value.
*
* Unlike most binary search implementations, the value being searched for is not an argument to search method.
* Rather, this value should be stored by the subclass along with the underlying array.
*
* @return the index of the closest element.
*
* Requires: The underlying array should be sorted.
**/
public int search(int from, int to){
while(from < to){
int mid = (from + to) >>> 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

int compareResult = compare(mid);

if(compareResult == 0){
// arr[mid] == value
return mid;
} else if(compareResult < 0){
// arr[mid] < val

if(mid < to) {
// Check if val is between (mid, mid + 1) before setting left = mid + 1
// (mid < to) ensures that mid + 1 is not out of bounds
int compareValAfterMid = compare(mid + 1);
if (compareValAfterMid > 0) {
return getClosestIndex(mid, mid + 1);
}
} else if(mid == to){
// val > arr[mid] and there are no more elements above mid, so mid is the closest
return mid;
}

from = mid + 1;
} else{
// arr[mid] > val

if(mid > from) {
// Check if val is between (mid - 1, mid)
// (mid > from) ensures that mid - 1 is not out of bounds
int compareValBeforeMid = compare(mid - 1);
if (compareValBeforeMid < 0) {
// val is between indices (mid - 1), mid
return getClosestIndex(mid, mid - 1);
}
} else if(mid == 0){
// val < arr[mid] and there are no more candidates below mid, so mid is the closest
return mid;
}

to = mid - 1;
}
}

return from;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,11 @@
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.VariableWidthHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalVariableWidthHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.bucket.missing.InternalMissing;
Expand Down Expand Up @@ -432,6 +434,10 @@ private ValuesSourceRegistry registerAggregations(List<SearchPlugin> plugins) {
AutoDateHistogramAggregationBuilder.PARSER)
.addResultReader(InternalAutoDateHistogram::new)
.setAggregatorRegistrar(AutoDateHistogramAggregationBuilder::registerAggregators), builder);
registerAggregation(new AggregationSpec(VariableWidthHistogramAggregationBuilder.NAME, VariableWidthHistogramAggregationBuilder::new,
VariableWidthHistogramAggregationBuilder.PARSER)
.addResultReader(InternalVariableWidthHistogram::new)
.setAggregatorRegistrar(VariableWidthHistogramAggregationBuilder::registerAggregators), builder);
registerAggregation(new AggregationSpec(GeoDistanceAggregationBuilder.NAME, GeoDistanceAggregationBuilder::new,
GeoDistanceAggregationBuilder::parse)
.addResultReader(InternalGeoDistance::new)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,9 @@ final class CommonFields extends ParseField.CommonFields {
public static final ParseField FROM_AS_STRING = new ParseField("from_as_string");
public static final ParseField TO = new ParseField("to");
public static final ParseField TO_AS_STRING = new ParseField("to_as_string");
public static final ParseField MIN = new ParseField("min");
public static final ParseField MIN_AS_STRING = new ParseField("min_as_string");
public static final ParseField MAX = new ParseField("max");
public static final ParseField MAX_AS_STRING = new ParseField("max_as_string");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do
subCollector.collect(doc, bucketOrd);
}

/**
* This only tidies up doc counts. Call {@link MergingBucketsDeferringCollector#mergeBuckets(long[])} to merge the actual
* ordinals and doc ID deltas.
*
* Refer to that method for documentation about the merge map.
*/
public final void mergeBuckets(long[] mergeMap, long newNumBuckets) {
try (IntArray oldDocCounts = docCounts) {
docCounts = bigArrays.newIntArray(newNumBuckets, true);
Expand Down
Loading