Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] add support for percentile aggs #51808

Merged
merged 2 commits into from
Feb 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,70 @@ public void testContinuousDateNanos() throws Exception {
deleteIndex(indexName);
}

public void testPivotWithPercentile() throws Exception {
String transformId = "percentile_pivot";
String transformIndex = "percentile_pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},"
+ " \"frequency\": \"1s\","
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"p\": {"
+ " \"percentiles\" : {"
+ " \"field\": \"stars\", "
+ " \"percents\": [5, 50, 90, 99.9]"
+ " }"
+ " } } }"
+ "}";
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));

startAndWaitForTransform(transformId, transformIndex);
assertTrue(indexExists(transformIndex));
// get and check some users
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918);

Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.5", searchResult)).get(0);
assertEquals(1, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.50", searchResult)).get(0);
assertEquals(5, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.99_9", searchResult)).get(0);
assertEquals(5, actual.longValue());

searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_11");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.5", searchResult)).get(0);
assertEquals(1, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.50", searchResult)).get(0);
assertEquals(4, actual.longValue());
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.99_9", searchResult)).get(0);
assertEquals(5, actual.longValue());
}

private void createDateNanoIndex(String indexName, int numDocs) throws IOException {
// create mapping
try (XContentBuilder builder = jsonBuilder()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import org.elasticsearch.search.aggregations.metrics.GeoBounds;
import org.elasticsearch.search.aggregations.metrics.GeoCentroid;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
import org.elasticsearch.search.aggregations.metrics.Percentile;
import org.elasticsearch.search.aggregations.metrics.Percentiles;
import org.elasticsearch.search.aggregations.metrics.ScriptedMetric;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
import org.elasticsearch.xpack.transform.transforms.IDGenerator;
import org.elasticsearch.xpack.transform.utils.OutputFieldNameConverter;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -46,6 +49,7 @@ public final class AggregationResultUtils {
tempMap.put(ScriptedMetric.class.getName(), new ScriptedMetricAggExtractor());
tempMap.put(GeoCentroid.class.getName(), new GeoCentroidAggExtractor());
tempMap.put(GeoBounds.class.getName(), new GeoBoundsAggExtractor());
tempMap.put(Percentiles.class.getName(), new PercentilesAggExtractor());
TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
}

Expand All @@ -59,12 +63,14 @@ public final class AggregationResultUtils {
* @param stats stats collector
* @return a map containing the results of the aggregation in a consumable way
*/
public static Stream<Map<String, Object>> extractCompositeAggregationResults(CompositeAggregation agg,
GroupConfig groups,
Collection<AggregationBuilder> aggregationBuilders,
Collection<PipelineAggregationBuilder> pipelineAggs,
Map<String, String> fieldTypeMap,
TransformIndexerStats stats) {
public static Stream<Map<String, Object>> extractCompositeAggregationResults(
CompositeAggregation agg,
GroupConfig groups,
Collection<AggregationBuilder> aggregationBuilders,
Collection<PipelineAggregationBuilder> pipelineAggs,
Map<String, String> fieldTypeMap,
TransformIndexerStats stats
) {
return agg.getBuckets().stream().map(bucket -> {
stats.incrementNumDocuments(bucket.getDocCount());
Map<String, Object> document = new HashMap<>();
Expand All @@ -82,7 +88,7 @@ public static Stream<Map<String, Object>> extractCompositeAggregationResults(Com
List<String> aggNames = aggregationBuilders.stream().map(AggregationBuilder::getName).collect(Collectors.toList());
aggNames.addAll(pipelineAggs.stream().map(PipelineAggregationBuilder::getName).collect(Collectors.toList()));

for (String aggName: aggNames) {
for (String aggName : aggNames) {
Aggregation aggResult = bucket.getAggregations().get(aggName);
// This indicates not that the value contained in the `aggResult` is null, but that the `aggResult` is not
// present at all in the `bucket.getAggregations`. This could occur in the case of a `bucket_selector` agg, which
Expand All @@ -109,16 +115,19 @@ static AggValueExtractor getExtractor(Aggregation aggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoCentroid.class.getName());
} else if (aggregation instanceof GeoBounds) {
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoBounds.class.getName());
} else if (aggregation instanceof Percentiles) {
return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName());
} else {
// Execution should never reach this point!
// Creating transforms with unsupported aggregations shall not be possible
throw new AggregationExtractionException("unsupported aggregation [{}] with name [{}]",
throw new AggregationExtractionException(
"unsupported aggregation [{}] with name [{}]",
aggregation.getType(),
aggregation.getName());
aggregation.getName()
);
}
}


@SuppressWarnings("unchecked")
static void updateDocument(Map<String, Object> document, String fieldName, Object value) {
String[] fieldTokens = fieldName.split("\\.");
Expand All @@ -132,23 +141,23 @@ static void updateDocument(Map<String, Object> document, String fieldName, Objec
if (i == fieldTokens.length - 1) {
if (internalMap.containsKey(token)) {
if (internalMap.get(token) instanceof Map) {
throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]",
fieldName);
throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]", fieldName);
} else {
throw new AggregationExtractionException("duplicate key value pairs key [{}] old value [{}] duplicate value [{}]",
throw new AggregationExtractionException(
"duplicate key value pairs key [{}] old value [{}] duplicate value [{}]",
fieldName,
internalMap.get(token),
value);
value
);
}
}
internalMap.put(token, value);
} else {
if (internalMap.containsKey(token)) {
if (internalMap.get(token) instanceof Map) {
internalMap = (Map<String, Object>)internalMap.get(token);
internalMap = (Map<String, Object>) internalMap.get(token);
} else {
throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]",
fieldName);
throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]", fieldName);
}
} else {
Map<String, Object> newMap = new HashMap<>();
Expand All @@ -172,34 +181,48 @@ interface AggValueExtractor {
static class SingleValueAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
SingleValue aggregation = (SingleValue)agg;
SingleValue aggregation = (SingleValue) agg;
// If the double is invalid, this indicates sparse data
if (Numbers.isValidDouble(aggregation.value()) == false) {
return null;
}
// If the type is numeric or if the formatted string is the same as simply making the value a string,
// gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs.
if (isNumericType(fieldType) ||
aggregation.getValueAsString().equals(String.valueOf(aggregation.value()))){
// gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs.
if (isNumericType(fieldType) || aggregation.getValueAsString().equals(String.valueOf(aggregation.value()))) {
return aggregation.value();
} else {
return aggregation.getValueAsString();
}
}
}

static class PercentilesAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
Percentiles aggregation = (Percentiles) agg;

HashMap<String, Double> percentiles = new HashMap<>();

for (Percentile p : aggregation) {
percentiles.put(OutputFieldNameConverter.fromDouble(p.getPercent()), p.getValue());
}

return percentiles;
}
}

static class ScriptedMetricAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
ScriptedMetric aggregation = (ScriptedMetric)agg;
ScriptedMetric aggregation = (ScriptedMetric) agg;
return aggregation.aggregation();
}
}

static class GeoCentroidAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
GeoCentroid aggregation = (GeoCentroid)agg;
GeoCentroid aggregation = (GeoCentroid) agg;
// if the account is `0` iff there is no contained centroid
return aggregation.count() > 0 ? aggregation.centroid().toString() : null;
}
Expand All @@ -208,38 +231,49 @@ public Object value(Aggregation agg, String fieldType) {
static class GeoBoundsAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, String fieldType) {
GeoBounds aggregation = (GeoBounds)agg;
GeoBounds aggregation = (GeoBounds) agg;
if (aggregation.bottomRight() == null || aggregation.topLeft() == null) {
return null;
}
final Map<String, Object> geoShape = new HashMap<>();
// If the two geo_points are equal, it is a point
if (aggregation.topLeft().equals(aggregation.bottomRight())) {
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), PointBuilder.TYPE.shapeName());
geoShape.put(ShapeParser.FIELD_COORDINATES.getPreferredName(),
Arrays.asList(aggregation.topLeft().getLon(), aggregation.bottomRight().getLat()));
// If only the lat or the lon of the two geo_points are equal, than we know it should be a line
geoShape.put(
ShapeParser.FIELD_COORDINATES.getPreferredName(),
Arrays.asList(aggregation.topLeft().getLon(), aggregation.bottomRight().getLat())
);
// If only the lat or the lon of the two geo_points are equal, than we know it should be a line
} else if (Double.compare(aggregation.topLeft().getLat(), aggregation.bottomRight().getLat()) == 0
|| Double.compare(aggregation.topLeft().getLon(), aggregation.bottomRight().getLon()) == 0) {
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), LineStringBuilder.TYPE.shapeName());
geoShape.put(ShapeParser.FIELD_COORDINATES.getPreferredName(),
Arrays.asList(
new Double[]{aggregation.topLeft().getLon(), aggregation.topLeft().getLat()},
new Double[]{aggregation.bottomRight().getLon(), aggregation.bottomRight().getLat()}));
} else {
// neither points are equal, we have a polygon that is a square
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), PolygonBuilder.TYPE.shapeName());
final GeoPoint tl = aggregation.topLeft();
final GeoPoint br = aggregation.bottomRight();
geoShape.put(ShapeParser.FIELD_COORDINATES.getPreferredName(),
Collections.singletonList(Arrays.asList(
new Double[]{tl.getLon(), tl.getLat()},
new Double[]{br.getLon(), tl.getLat()},
new Double[]{br.getLon(), br.getLat()},
new Double[]{tl.getLon(), br.getLat()},
new Double[]{tl.getLon(), tl.getLat()})));
}
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), LineStringBuilder.TYPE.shapeName());
geoShape.put(
ShapeParser.FIELD_COORDINATES.getPreferredName(),
Arrays.asList(
new Double[] { aggregation.topLeft().getLon(), aggregation.topLeft().getLat() },
new Double[] { aggregation.bottomRight().getLon(), aggregation.bottomRight().getLat() }
)
);
} else {
// neither points are equal, we have a polygon that is a square
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), PolygonBuilder.TYPE.shapeName());
final GeoPoint tl = aggregation.topLeft();
final GeoPoint br = aggregation.bottomRight();
geoShape.put(
ShapeParser.FIELD_COORDINATES.getPreferredName(),
Collections.singletonList(
Arrays.asList(
new Double[] { tl.getLon(), tl.getLat() },
new Double[] { br.getLon(), tl.getLat() },
new Double[] { br.getLon(), br.getLat() },
new Double[] { tl.getLon(), br.getLat() },
new Double[] { tl.getLon(), tl.getLat() }
)
)
);
}
return geoShape;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@

package org.elasticsearch.xpack.transform.transforms.pivot;

import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
import org.elasticsearch.xpack.transform.utils.OutputFieldNameConverter;

import java.util.Arrays;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -17,6 +24,7 @@ public final class Aggregations {
private static final String DYNAMIC = "_dynamic";
// the field mapping should be determined explicitly from the source field mapping if possible.
private static final String SOURCE = "_source";

private Aggregations() {}

/**
Expand All @@ -40,7 +48,8 @@ enum AggregationType {
SCRIPTED_METRIC("scripted_metric", DYNAMIC),
WEIGHTED_AVG("weighted_avg", DYNAMIC),
BUCKET_SELECTOR("bucket_selector", DYNAMIC),
BUCKET_SCRIPT("bucket_script", DYNAMIC);
BUCKET_SCRIPT("bucket_script", DYNAMIC),
PERCENTILES("percentiles", "double");

private final String aggregationType;
private final String targetMapping;
Expand All @@ -59,8 +68,9 @@ public String getTargetMapping() {
}
}

private static Set<String> aggregationSupported = Stream.of(AggregationType.values()).map(AggregationType::name)
.collect(Collectors.toSet());
private static Set<String> aggregationSupported = Stream.of(AggregationType.values())
.map(AggregationType::name)
.collect(Collectors.toSet());

public static boolean isSupportedByTransform(String aggregationType) {
return aggregationSupported.contains(aggregationType.toUpperCase(Locale.ROOT));
Expand All @@ -74,4 +84,19 @@ public static String resolveTargetMapping(String aggregationType, String sourceT
AggregationType agg = AggregationType.valueOf(aggregationType.toUpperCase(Locale.ROOT));
return agg.getTargetMapping().equals(SOURCE) ? sourceType : agg.getTargetMapping();
}

public static Map<String, String> getAggregationOutputTypes(AggregationBuilder agg) {
if (agg instanceof PercentilesAggregationBuilder) {
PercentilesAggregationBuilder percentilesAgg = (PercentilesAggregationBuilder) agg;

// note: eclipse does not like p -> agg.getType()
// the merge function (p1, p2) -> p1 ignores duplicates
return Arrays.stream(percentilesAgg.percentiles())
.mapToObj(OutputFieldNameConverter::fromDouble)
.collect(Collectors.toMap(p -> agg.getName() + "." + p, p -> { return agg.getType(); }, (p1, p2) -> p1));
}
// catch all
return Collections.singletonMap(agg.getName(), agg.getType());
}

}
Loading