Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bytes from geogrid aggregation to request circuit-breaker #50720

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.search.aggregations.bucket.composite;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
Expand Down Expand Up @@ -113,7 +114,8 @@ protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardCon
ValuesSource.Geo geoValue = (ValuesSource.Geo) orig;
// is specified in the builder.
final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null;
CellIdSource cellIdSource = new CellIdSource(geoValue, precision, GeoGridTiler.GeoTileGridTiler.INSTANCE);
CircuitBreaker breaker = queryShardContext.bigArrays().breakerService().getBreaker(CircuitBreaker.REQUEST);
CellIdSource cellIdSource = new CellIdSource(geoValue, precision, GeoGridTiler.GeoTileGridTiler.INSTANCE, breaker);
return new CompositeValuesSourceConfig(name, fieldType, cellIdSource, DocValueFormat.GEOTILE, order(),
missingBucket(), script() != null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.index.fielddata.AbstractSortingNumericDocValues;
import org.elasticsearch.index.fielddata.MultiGeoValues;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
Expand All @@ -38,12 +39,13 @@ public class CellIdSource extends ValuesSource.Numeric {
private final ValuesSource.Geo valuesSource;
private final int precision;
private final GeoGridTiler encoder;
private final CircuitBreaker circuitBreaker;

public CellIdSource(Geo valuesSource, int precision, GeoGridTiler encoder) {
public CellIdSource(Geo valuesSource, int precision, GeoGridTiler encoder, CircuitBreaker circuitBreaker) {
this.valuesSource = valuesSource;
//different GeoPoints could map to the same or different hashing cells.
this.precision = precision;
this.encoder = encoder;
this.circuitBreaker = circuitBreaker;
}

public int precision() {
Expand All @@ -68,7 +70,7 @@ public SortedNumericDocValues longValues(LeafReaderContext ctx) {
return new GeoPointCellValues(geoValues, precision, encoder);
} else if (CoreValuesSourceType.GEOSHAPE == vs || CoreValuesSourceType.GEO == vs) {
// docValues are geo shapes
return new GeoShapeCellValues(geoValues, precision, encoder);
return new GeoShapeCellValues(geoValues, precision, encoder, circuitBreaker);
} else {
throw new IllegalArgumentException("unsupported geo type");
}
Expand All @@ -85,19 +87,32 @@ public SortedBinaryDocValues bytesValues(LeafReaderContext ctx) {
}

/** Sorted numeric doc values for geo shapes */
protected static class GeoShapeCellValues extends AbstractSortingNumericDocValues {
protected static class GeoShapeCellValues extends BytesTrackingSortingNumericDocValues {
private MultiGeoValues geoValues;
private int precision;
private GeoGridTiler tiler;
private CircuitBreaker circuitBreaker;
private int maxReservedArrayLength;

protected GeoShapeCellValues(MultiGeoValues geoValues, int precision, GeoGridTiler tiler) {
protected GeoShapeCellValues(MultiGeoValues geoValues, int precision, GeoGridTiler tiler, CircuitBreaker circuitBreaker) {
this.geoValues = geoValues;
this.precision = precision;
this.tiler = tiler;
this.circuitBreaker = circuitBreaker;
// account for initialized values array of length 1
this.maxReservedArrayLength = 1;
circuitBreaker.addEstimateBytesAndMaybeBreak(Long.BYTES, "geogrid-cell-counter");
}

protected void resizeCell(int newSize) {
int oldValuesLength = values.length;
resize(newSize);
int newValuesLength = values.length;
if (newValuesLength > oldValuesLength && maxReservedArrayLength < newValuesLength) {
maxReservedArrayLength = newValuesLength;
long bytesDiff = (newValuesLength - oldValuesLength) * Long.BYTES;
circuitBreaker.addEstimateBytesAndMaybeBreak(bytesDiff, "geogrid-cell-counter");
}
}

protected void add(int idx, long value) {
Expand All @@ -114,7 +129,6 @@ public boolean advanceExact(int docId) throws IOException {
if (geoValues.advanceExact(docId)) {
ValuesSourceType vs = geoValues.valuesSourceType();
MultiGeoValues.GeoValue target = geoValues.nextValue();
// TODO(talevy): determine reasonable circuit-breaker here
resize(0);
tiler.setValues(this, target, precision);
sort();
Expand All @@ -126,7 +140,7 @@ public boolean advanceExact(int docId) throws IOException {
}

/** Sorted numeric doc values for geo points */
protected static class GeoPointCellValues extends AbstractSortingNumericDocValues {
protected static class GeoPointCellValues extends BytesTrackingSortingNumericDocValues {
private MultiGeoValues geoValues;
private int precision;
private GeoGridTiler tiler;
Expand Down Expand Up @@ -159,7 +173,7 @@ public boolean advanceExact(int docId) throws IOException {
}

/** Sorted numeric doc values for precision 0 */
protected static class AllCellValues extends AbstractSortingNumericDocValues {
protected static class AllCellValues extends BytesTrackingSortingNumericDocValues {
private MultiGeoValues geoValues;

protected AllCellValues(MultiGeoValues geoValues, GeoGridTiler tiler) {
Expand All @@ -179,4 +193,10 @@ public boolean advanceExact(int docId) throws IOException {
return geoValues.advanceExact(docId);
}
}

abstract static class BytesTrackingSortingNumericDocValues extends AbstractSortingNumericDocValues {
long getValuesBytes() {
return values.length * Long.BYTES;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.search.aggregations.bucket.geogrid;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.LongHash;
Expand All @@ -46,6 +45,7 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
protected final int shardSize;
protected final CellIdSource valuesSource;
protected final LongHash bucketOrds;
private CellIdSource.BytesTrackingSortingNumericDocValues values;

GeoGridAggregator(String name, AggregatorFactories factories, CellIdSource valuesSource,
int requiredSize, int shardSize, SearchContext aggregationContext, Aggregator parent,
Expand All @@ -65,10 +65,24 @@ public ScoreMode scoreMode() {
return super.scoreMode();
}

@Override
protected void preGetSubLeafCollectors() {
if (values != null) {
addRequestCircuitBreakerBytes(-values.getValuesBytes());
}
}

@Override
public void doPostCollection() {
if (values != null) {
addRequestCircuitBreakerBytes(-values.getValuesBytes());
}
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
final SortedNumericDocValues values = valuesSource.longValues(ctx);
values = (CellIdSource.BytesTrackingSortingNumericDocValues) valuesSource.longValues(ctx);
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int doc, long bucket) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.search.aggregations.bucket.geogrid;

import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -76,7 +77,8 @@ protected Aggregator doCreateInternal(final ValuesSource.Geo valuesSource,
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, searchContext, parent);
}
CellIdSource cellIdSource = new CellIdSource(valuesSource, precision, GeoGridTiler.GeoHashGridTiler.INSTANCE);
CellIdSource cellIdSource = new CellIdSource(valuesSource, precision, GeoGridTiler.GeoHashGridTiler.INSTANCE,
searchContext.bigArrays().breakerService().getBreaker(CircuitBreaker.REQUEST));
return new GeoHashGridAggregator(name, factories, cellIdSource, requiredSize, shardSize, searchContext, parent,
pipelineAggregators, metaData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.search.aggregations.bucket.geogrid;

import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -76,7 +77,8 @@ protected Aggregator doCreateInternal(final ValuesSource.Geo valuesSource,
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, searchContext, parent);
}
CellIdSource cellIdSource = new CellIdSource(valuesSource, precision, GeoGridTiler.GeoTileGridTiler.INSTANCE);
CellIdSource cellIdSource = new CellIdSource(valuesSource, precision, GeoGridTiler.GeoTileGridTiler.INSTANCE,
searchContext.bigArrays().breakerService().getBreaker(CircuitBreaker.REQUEST));
return new GeoTileGridAggregator(name, factories, cellIdSource, requiredSize, shardSize, searchContext, parent,
pipelineAggregators, metaData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
Expand Down Expand Up @@ -67,7 +68,8 @@ public void testReplay() throws Exception {
Query rewrittenQuery = indexSearcher.rewrite(termQuery);
TopDocs topDocs = indexSearcher.search(termQuery, numDocs);

SearchContext searchContext = createSearchContext(indexSearcher, createIndexSettings(), rewrittenQuery, null);
SearchContext searchContext = createSearchContext(indexSearcher, createIndexSettings(), rewrittenQuery, null,
new NoneCircuitBreakerService());
when(searchContext.query()).thenReturn(rewrittenQuery);
BestBucketsDeferringCollector collector = new BestBucketsDeferringCollector(searchContext, false) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,32 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.geo.CentroidCalculator;
import org.elasticsearch.common.geo.GeoShapeCoordinateEncoder;
import org.elasticsearch.common.geo.GeoTestUtils;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.geo.GeometryTestUtils;
import org.elasticsearch.geometry.Geometry;
import org.elasticsearch.geometry.MultiPoint;
import org.elasticsearch.geometry.Point;
import org.elasticsearch.index.fielddata.MultiGeoValues;
import org.elasticsearch.index.mapper.BinaryGeoShapeDocValuesField;
import org.elasticsearch.index.mapper.GeoPointFieldMapper;
import org.elasticsearch.index.mapper.GeoShapeFieldMapper;
import org.elasticsearch.index.mapper.GeoShapeIndexer;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.indices.breaker.BreakerSettings;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -51,6 +65,9 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.elasticsearch.common.geo.GeoTestUtils.triangleTreeReader;

public abstract class GeoGridAggregatorTestCase<T extends InternalGeoGridBucket> extends AggregatorTestCase {

Expand All @@ -71,34 +88,39 @@ public abstract class GeoGridAggregatorTestCase<T extends InternalGeoGridBucket>
*/
protected abstract GeoGridAggregationBuilder createBuilder(String name);

/**
* return which geogrid tiler is used
*/
protected abstract GeoGridTiler geoGridTiler();

public void testNoDocs() throws IOException {
testCase(new MatchAllDocsQuery(), FIELD_NAME, randomPrecision(), iw -> {
// Intentionally not writing any docs
}, geoGrid -> {
assertEquals(0, geoGrid.getBuckets().size());
}, new GeoPointFieldMapper.GeoPointFieldType());
}, new GeoPointFieldMapper.GeoPointFieldType(), new NoneCircuitBreakerService());

testCase(new MatchAllDocsQuery(), FIELD_NAME, randomPrecision(), iw -> {
// Intentionally not writing any docs
}, geoGrid -> {
assertEquals(0, geoGrid.getBuckets().size());
}, new GeoShapeFieldMapper.GeoShapeFieldType());
}, new GeoShapeFieldMapper.GeoShapeFieldType(), new NoneCircuitBreakerService());
}

public void testFieldMissing() throws IOException {
testCase(new MatchAllDocsQuery(), "wrong_field", randomPrecision(), iw -> {
iw.addDocument(Collections.singleton(new LatLonDocValuesField(FIELD_NAME, 10D, 10D)));
}, geoGrid -> {
assertEquals(0, geoGrid.getBuckets().size());
}, new GeoPointFieldMapper.GeoPointFieldType());
}, new GeoPointFieldMapper.GeoPointFieldType(), new NoneCircuitBreakerService());

testCase(new MatchAllDocsQuery(), "wrong_field", randomPrecision(), iw -> {
iw.addDocument(Collections.singleton(
new BinaryGeoShapeDocValuesField(FIELD_NAME, GeoTestUtils.toDecodedTriangles(new Point(10D, 10D)),
new CentroidCalculator(new Point(10D, 10D)))));
}, geoGrid -> {
assertEquals(0, geoGrid.getBuckets().size());
}, new GeoShapeFieldMapper.GeoShapeFieldType());
}, new GeoShapeFieldMapper.GeoShapeFieldType(), new NoneCircuitBreakerService());
}

public void testGeoPointWithSeveralDocs() throws IOException {
Expand Down Expand Up @@ -140,7 +162,7 @@ public void testGeoPointWithSeveralDocs() throws IOException {
assertEquals((long) expectedCountPerGeoHash.get(bucket.getKeyAsString()), bucket.getDocCount());
}
assertTrue(AggregationInspectionHelper.hasValue(geoHashGrid));
}, new GeoPointFieldMapper.GeoPointFieldType());
}, new GeoPointFieldMapper.GeoPointFieldType(), new NoneCircuitBreakerService());
}

public void testGeoShapeWithSeveralDocs() throws IOException {
Expand Down Expand Up @@ -191,11 +213,45 @@ public void testGeoShapeWithSeveralDocs() throws IOException {
assertEquals((long) expectedCountPerGeoHash.get(bucket.getKeyAsString()), bucket.getDocCount());
}
assertTrue(AggregationInspectionHelper.hasValue(geoHashGrid));
}, new GeoShapeFieldMapper.GeoShapeFieldType());
}, new GeoShapeFieldMapper.GeoShapeFieldType(), new NoneCircuitBreakerService());
}

public void testGeoShapeTrippedCircuitBreaker() throws IOException {
GeoGridTiler tiler = geoGridTiler();
int precision = randomIntBetween(1, 9); // does not go until MAX_ZOOM for performance reasons
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just verifying this won't nuke CI, since it has to run at least once to determine the number tiles? I'm assuming that's why it's limited to precision: 9 but thought I'd double check :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it is just that I am not interested in tiling a shape into 100s of thousands of tiles. I've run this test over 1000 iterations and things seem to run snappily

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


@SuppressWarnings("unchecked") Function<Boolean, Geometry> geometryGen = ESTestCase.randomFrom(
GeometryTestUtils::randomLine,
GeometryTestUtils::randomPoint,
GeometryTestUtils::randomPolygon,
GeometryTestUtils::randomMultiLine,
GeometryTestUtils::randomMultiPoint
);
Geometry geometry = new GeoShapeIndexer(true, "indexer").prepareForIndexing(geometryGen.apply(false));

// get expected number of tiles to find
CellIdSource.GeoShapeCellValues values = new CellIdSource.GeoShapeCellValues(null, precision, tiler,
new NoopCircuitBreaker("test"));
int numTiles = geoGridTiler().setValues(values, new MultiGeoValues.GeoShapeValue(triangleTreeReader(geometry,
GeoShapeCoordinateEncoder.INSTANCE)), precision);

CircuitBreakerService circuitBreakerService = new HierarchyCircuitBreakerService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
BreakerSettings settings = new BreakerSettings(CircuitBreaker.REQUEST, numTiles * Long.BYTES - 1, 1.0);
circuitBreakerService.registerBreaker(settings);

expectThrows(CircuitBreakingException.class, () -> testCase(new MatchAllDocsQuery(), FIELD_NAME, precision, iw -> {
Document document = new Document();
document.add(new BinaryGeoShapeDocValuesField(FIELD_NAME,
GeoTestUtils.toDecodedTriangles(geometry), new CentroidCalculator(geometry)));
iw.addDocument(document);
document.clear();
}, internalGeoGrid -> {}, new GeoShapeFieldMapper.GeoShapeFieldType(), circuitBreakerService));
}

private void testCase(Query query, String field, int precision, CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
Consumer<InternalGeoGrid<T>> verify, MappedFieldType fieldType) throws IOException {
Consumer<InternalGeoGrid<T>> verify, MappedFieldType fieldType,
CircuitBreakerService circuitBreakerService) throws IOException {
Directory directory = newDirectory();
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
buildIndex.accept(indexWriter);
Expand All @@ -209,13 +265,18 @@ private void testCase(Query query, String field, int precision, CheckedConsumer<
fieldType.setHasDocValues(true);
fieldType.setName(FIELD_NAME);

Aggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
aggregator.preCollection();
indexSearcher.search(query, aggregator);
aggregator.postCollection();
verify.accept((InternalGeoGrid<T>) aggregator.buildAggregation(0L));

indexReader.close();
directory.close();
try {
Aggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, circuitBreakerService, fieldType);
aggregator.preCollection();
if (aggregator instanceof GeoGridAggregator) {
((GeoGridAggregator) aggregator).preGetSubLeafCollectors();
}
indexSearcher.search(query, aggregator);
aggregator.postCollection();
verify.accept((InternalGeoGrid<T>) aggregator.buildAggregation(0L));
} finally {
indexReader.close();
directory.close();
}
}
}
Loading