Skip to content

Commit

Permalink
Add geo_shape support for the geo_centroid aggregation (#55602) (#55819)
Browse files Browse the repository at this point in the history
this commit leverages the new geo_shape doc values
to register a new geo_centroid aggregator that works
on geo_shape field.
  • Loading branch information
talevy authored Apr 27, 2020
1 parent 8df5cff commit 6ba5148
Showing 17 changed files with 540 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@
@FunctionalInterface
public interface GeoCentroidAggregatorSupplier extends AggregatorSupplier {

GeoCentroidAggregator build(String name, SearchContext context, Aggregator parent,
MetricsAggregator build(String name, SearchContext context, Aggregator parent,
ValuesSource valuesSource,
Map<String, Object> metadata) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@ public static double decodeLongitude(long encodedLatLon) {
return GeoEncodingUtils.decodeLongitude((int) (encodedLatLon & 0xFFFFFFFFL));
}

InternalGeoCentroid(String name, GeoPoint centroid, long count, Map<String, Object> metadata) {
public InternalGeoCentroid(String name, GeoPoint centroid, long count, Map<String, Object> metadata) {
super(name, metadata);
assert (centroid == null) == (count == 0);
this.centroid = centroid;
Original file line number Diff line number Diff line change
@@ -45,7 +45,8 @@ public enum Feature {
SECURITY_TOKEN_SERVICE(OperationMode.GOLD, false),
SECURITY_API_KEY_SERVICE(OperationMode.MISSING, false),
SECURITY_AUTHORIZATION_REALM(OperationMode.PLATINUM, true),
SECURITY_AUTHORIZATION_ENGINE(OperationMode.PLATINUM, true);
SECURITY_AUTHORIZATION_ENGINE(OperationMode.PLATINUM, true),
SPATIAL_GEO_CENTROID(OperationMode.GOLD, true);

final OperationMode minimumOperationMode;
final boolean needsActive;
3 changes: 2 additions & 1 deletion x-pack/plugin/spatial/build.gradle
Original file line number Diff line number Diff line change
@@ -18,14 +18,15 @@ dependencies {

restResources {
restApi {
includeCore '_common', 'indices', 'index', 'search'
includeCore '_common', 'bulk', 'indices', 'index', 'search'
}
restTests {
includeCore 'geo_shape'
}
}

testClusters.integTest {
setting 'xpack.license.self_generated.type', 'trial'
testDistribution = 'DEFAULT'
}

Original file line number Diff line number Diff line change
@@ -9,18 +9,23 @@
import org.elasticsearch.geo.GeoPlugin;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.metrics.GeoBoundsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.GeoBoundsAggregatorSupplier;
import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregatorSupplier;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.spatial.index.mapper.GeoShapeWithDocValuesFieldMapper;
import org.elasticsearch.xpack.spatial.index.mapper.PointFieldMapper;
import org.elasticsearch.xpack.spatial.index.mapper.ShapeFieldMapper;
import org.elasticsearch.xpack.spatial.index.query.ShapeQueryBuilder;
import org.elasticsearch.xpack.spatial.ingest.CircleProcessor;
import org.elasticsearch.xpack.spatial.aggregations.metrics.GeoShapeCentroidAggregator;
import org.elasticsearch.xpack.spatial.search.aggregations.metrics.GeoShapeBoundsAggregator;
import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoShapeValuesSource;
import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoShapeValuesSourceType;
@@ -42,6 +47,11 @@ public Collection<Module> createGuiceModules() {
});
}

// to be overriden by tests
protected XPackLicenseState getLicenseState() {
return XPackPlugin.getSharedLicenseState();
}

@Override
public Map<String, Mapper.TypeParser> getMappers() {
Map<String, Mapper.TypeParser> mappers = new HashMap<>(super.getMappers());
@@ -58,18 +68,30 @@ public List<QuerySpec<?>> getQueries() {

@Override
public List<Consumer<ValuesSourceRegistry.Builder>> getAggregationExtentions() {
return org.elasticsearch.common.collect.List.of(SpatialPlugin::registerGeoShapeBoundsAggregator);
return org.elasticsearch.common.collect.List.of(this::registerGeoShapeBoundsAggregator,
this::registerGeoShapeCentroidAggregator);
}

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Collections.singletonMap(CircleProcessor.TYPE, new CircleProcessor.Factory());
}

public static void registerGeoShapeBoundsAggregator(ValuesSourceRegistry.Builder builder) {
public void registerGeoShapeBoundsAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(GeoBoundsAggregationBuilder.NAME, GeoShapeValuesSourceType.instance(),
(GeoBoundsAggregatorSupplier) (name, aggregationContext, parent, valuesSource, wrapLongitude, metadata)
-> new GeoShapeBoundsAggregator(name, aggregationContext, parent, (GeoShapeValuesSource) valuesSource,
wrapLongitude, metadata));
}

public void registerGeoShapeCentroidAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(GeoCentroidAggregationBuilder.NAME, GeoShapeValuesSourceType.instance(),
(GeoCentroidAggregatorSupplier) (name, aggregationContext, parent, valuesSource, metadata)
-> {
if (getLicenseState().isAllowed(XPackLicenseState.Feature.SPATIAL_GEO_CENTROID)) {
return new GeoShapeCentroidAggregator(name, aggregationContext, parent, (GeoShapeValuesSource) valuesSource, metadata);
}
throw LicenseUtils.newComplianceException("geo_centroid aggregation on geo_shape fields");
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/


package org.elasticsearch.xpack.spatial.aggregations.metrics;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.InternalGeoCentroid;
import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.spatial.index.fielddata.DimensionalShapeType;
import org.elasticsearch.xpack.spatial.index.fielddata.MultiGeoShapeValues;
import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoShapeValuesSource;

import java.io.IOException;
import java.util.Map;

/**
* A geo metric aggregator that computes a geo-centroid from a {@code geo_shape} type field
*/
public final class GeoShapeCentroidAggregator extends MetricsAggregator {
private final GeoShapeValuesSource valuesSource;
private DoubleArray lonSum, lonCompensations, latSum, latCompensations, weightSum, weightCompensations;
private LongArray counts;
private ByteArray dimensionalShapeTypes;

public GeoShapeCentroidAggregator(String name, SearchContext context, Aggregator parent,
GeoShapeValuesSource valuesSource, Map<String, Object> metadata) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = valuesSource;
if (valuesSource != null) {
final BigArrays bigArrays = context.bigArrays();
lonSum = bigArrays.newDoubleArray(1, true);
lonCompensations = bigArrays.newDoubleArray(1, true);
latSum = bigArrays.newDoubleArray(1, true);
latCompensations = bigArrays.newDoubleArray(1, true);
weightSum = bigArrays.newDoubleArray(1, true);
weightCompensations = bigArrays.newDoubleArray(1, true);
counts = bigArrays.newLongArray(1, true);
dimensionalShapeTypes = bigArrays.newByteArray(1, true);
}
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
final MultiGeoShapeValues values = valuesSource.geoShapeValues(ctx);
final CompensatedSum compensatedSumLat = new CompensatedSum(0, 0);
final CompensatedSum compensatedSumLon = new CompensatedSum(0, 0);
final CompensatedSum compensatedSumWeight = new CompensatedSum(0, 0);

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
latSum = bigArrays.grow(latSum, bucket + 1);
lonSum = bigArrays.grow(lonSum, bucket + 1);
weightSum = bigArrays.grow(weightSum, bucket + 1);
lonCompensations = bigArrays.grow(lonCompensations, bucket + 1);
latCompensations = bigArrays.grow(latCompensations, bucket + 1);
weightCompensations = bigArrays.grow(weightCompensations, bucket + 1);
counts = bigArrays.grow(counts, bucket + 1);
dimensionalShapeTypes = bigArrays.grow(dimensionalShapeTypes, bucket + 1);

if (values.advanceExact(doc)) {
final int valueCount = values.docValueCount();
// increment by the number of points for this document
counts.increment(bucket, valueCount);
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
DimensionalShapeType shapeType = DimensionalShapeType.fromOrdinalByte(dimensionalShapeTypes.get(bucket));
double sumLat = latSum.get(bucket);
double compensationLat = latCompensations.get(bucket);
double sumLon = lonSum.get(bucket);
double compensationLon = lonCompensations.get(bucket);
double sumWeight = weightSum.get(bucket);
double compensatedWeight = weightCompensations.get(bucket);

compensatedSumLat.reset(sumLat, compensationLat);
compensatedSumLon.reset(sumLon, compensationLon);
compensatedSumWeight.reset(sumWeight, compensatedWeight);

// update the sum
for (int i = 0; i < valueCount; ++i) {
MultiGeoShapeValues.GeoShapeValue value = values.nextValue();
int compares = shapeType.compareTo(value.dimensionalShapeType());
if (compares < 0) {
double coordinateWeight = value.weight();
compensatedSumLat.reset(coordinateWeight * value.lat(), 0.0);
compensatedSumLon.reset(coordinateWeight * value.lon(), 0.0);
compensatedSumWeight.reset(coordinateWeight, 0.0);
dimensionalShapeTypes.set(bucket, (byte) value.dimensionalShapeType().ordinal());
} else if (compares == 0) {
double coordinateWeight = value.weight();
// weighted latitude
compensatedSumLat.add(coordinateWeight * value.lat());
// weighted longitude
compensatedSumLon.add(coordinateWeight * value.lon());
// weight
compensatedSumWeight.add(coordinateWeight);
}
// else (compares > 0)
// do not modify centroid calculation since shape is of lower dimension than the running dimension

}
lonSum.set(bucket, compensatedSumLon.value());
lonCompensations.set(bucket, compensatedSumLon.delta());
latSum.set(bucket, compensatedSumLat.value());
latCompensations.set(bucket, compensatedSumLat.delta());
weightSum.set(bucket, compensatedSumWeight.value());
weightCompensations.set(bucket, compensatedSumWeight.delta());
}
}
};
}

@Override
public InternalAggregation buildAggregation(long bucket) {
if (valuesSource == null || bucket >= counts.size()) {
return buildEmptyAggregation();
}
final long bucketCount = counts.get(bucket);
final double bucketWeight = weightSum.get(bucket);
final GeoPoint bucketCentroid = (bucketWeight > 0)
? new GeoPoint(latSum.get(bucket) / bucketWeight, lonSum.get(bucket) / bucketWeight)
: null;
return new InternalGeoCentroid(name, bucketCentroid , bucketCount, metadata());
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalGeoCentroid(name, null, 0L, metadata());
}

@Override
public void doClose() {
Releasables.close(latSum, latCompensations, lonSum, lonCompensations, counts, weightSum, weightCompensations,
dimensionalShapeTypes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.spatial;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.license.License;
import org.elasticsearch.license.TestUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.VersionUtils;

/**
* This class overrides the {@link SpatialPlugin} in order
* to provide the integration test clusters a hook into a real
* {@link XPackLicenseState}. In the cases that this is used, the
* actual license's operation mode is not important
*/
public class LocalStateSpatialPlugin extends SpatialPlugin {
protected XPackLicenseState getLicenseState() {
TestUtils.UpdatableLicenseState licenseState = new TestUtils.UpdatableLicenseState();
License.OperationMode operationMode = License.OperationMode.TRIAL;
licenseState.update(operationMode, true, VersionUtils.randomVersion(LuceneTestCase.random()));
return licenseState;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.spatial;

import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.license.License;
import org.elasticsearch.license.TestUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregatorSupplier;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoShapeValuesSourceType;

import java.util.List;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.equalTo;

public class SpatialPluginTests extends ESTestCase {

public void testGeoCentroidLicenseCheck() {
for (License.OperationMode operationMode : License.OperationMode.values()) {
SpatialPlugin plugin = getPluginWithOperationMode(operationMode);
ValuesSourceRegistry.Builder registryBuilder = new ValuesSourceRegistry.Builder();
List<Consumer<ValuesSourceRegistry.Builder>> registrar = plugin.getAggregationExtentions();
registrar.forEach(c -> c.accept(registryBuilder));
ValuesSourceRegistry registry = registryBuilder.build();
GeoCentroidAggregatorSupplier centroidSupplier = (GeoCentroidAggregatorSupplier) registry.getAggregator(
GeoShapeValuesSourceType.instance(), GeoCentroidAggregationBuilder.NAME);
if (License.OperationMode.TRIAL != operationMode &&
License.OperationMode.compare(operationMode, License.OperationMode.GOLD) < 0) {
ElasticsearchSecurityException exception = expectThrows(ElasticsearchSecurityException.class,
() -> centroidSupplier.build(null, null, null, null, null));
assertThat(exception.getMessage(),
equalTo("current license is non-compliant for [geo_centroid aggregation on geo_shape fields]"));
}
}
}

private SpatialPlugin getPluginWithOperationMode(License.OperationMode operationMode) {
return new SpatialPlugin() {
protected XPackLicenseState getLicenseState() {
TestUtils.UpdatableLicenseState licenseState = new TestUtils.UpdatableLicenseState();
licenseState.update(operationMode, true, VersionUtils.randomVersion(random()));
return licenseState;
}
};
}
}
Loading

0 comments on commit 6ba5148

Please sign in to comment.