From e036af14fcb8bbe518f8b1645cae9f6092296c87 Mon Sep 17 00:00:00 2001 From: penghuo Date: Fri, 4 Jun 2021 15:43:49 -0700 Subject: [PATCH 1/2] Support construct AggregationResponseParser during Aggregator build stage --- .../value/OpenSearchExprValueFactory.java | 16 +- .../OpenSearchAggregationResponseParser.java | 114 ----------- .../response/OpenSearchResponse.java | 2 +- .../agg/CompositeAggregationParser.java | 51 +++++ .../opensearch/response/agg/FilterParser.java | 38 ++++ .../opensearch/response/agg/MetricParser.java | 36 ++++ .../response/agg/MetricParserHelper.java | 56 +++++ .../agg/NoBucketAggregationParser.java | 41 ++++ .../OpenSearchAggregationResponseParser.java | 31 +++ .../response/agg/SingleValueParser.java | 39 ++++ .../opensearch/response/agg/StatsParser.java | 41 ++++ .../sql/opensearch/response/agg/Utils.java | 27 +++ .../opensearch/storage/OpenSearchIndex.java | 4 +- .../storage/OpenSearchIndexScan.java | 10 +- .../aggregation/AggregationQueryBuilder.java | 47 +++-- .../dsl/MetricAggregationBuilder.java | 93 ++++++--- .../response/AggregationResponseUtils.java | 4 + ...enSearchAggregationResponseParserTest.java | 192 ++++++++++++------ .../response/OpenSearchResponseTest.java | 42 ++-- .../AggregationQueryBuilderTest.java | 17 +- .../dsl/MetricAggregationBuilderTest.java | 2 +- 21 files changed, 650 insertions(+), 253 deletions(-) delete mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchAggregationResponseParser.java create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/CompositeAggregationParser.java create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/FilterParser.java create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/MetricParser.java create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/MetricParserHelper.java create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/NoBucketAggregationParser.java create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/OpenSearchAggregationResponseParser.java create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/SingleValueParser.java create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/StatsParser.java create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/Utils.java diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java index 313347aec1..001363b476 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java @@ -63,7 +63,7 @@ import java.util.List; import java.util.Map; import java.util.function.Function; -import lombok.AllArgsConstructor; +import lombok.Getter; import lombok.Setter; import org.opensearch.common.time.DateFormatters; import org.opensearch.sql.data.model.ExprBooleanValue; @@ -86,11 +86,11 @@ import org.opensearch.sql.opensearch.data.utils.Content; import org.opensearch.sql.opensearch.data.utils.ObjectContent; import org.opensearch.sql.opensearch.data.utils.OpenSearchJsonContent; +import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser; /** * Construct ExprValue from OpenSearch response. */ -@AllArgsConstructor public class OpenSearchExprValueFactory { /** * The Mapping of Field and ExprType. @@ -98,6 +98,10 @@ public class OpenSearchExprValueFactory { @Setter private Map typeMapping; + @Getter + @Setter + private OpenSearchAggregationResponseParser parser; + private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder() .appendOptional(SQL_LITERAL_DATE_TIME_FORMAT) @@ -131,6 +135,14 @@ public class OpenSearchExprValueFactory { .put(OPENSEARCH_BINARY, c -> new OpenSearchExprBinaryValue(c.stringValue())) .build(); + /** + * Constructor of OpenSearchExprValueFactory. + */ + public OpenSearchExprValueFactory( + Map typeMapping) { + this.typeMapping = typeMapping; + } + /** * The struct construction has the following assumption. 1. The field has OpenSearch Object * data type. https://www.elastic.co/guide/en/elasticsearch/reference/current/object.html 2. The diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchAggregationResponseParser.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchAggregationResponseParser.java deleted file mode 100644 index bb029cddb0..0000000000 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchAggregationResponseParser.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - * - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -/* - * - * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package org.opensearch.sql.opensearch.response; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import lombok.experimental.UtilityClass; -import org.opensearch.search.aggregations.Aggregation; -import org.opensearch.search.aggregations.Aggregations; -import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation; -import org.opensearch.search.aggregations.bucket.filter.Filter; -import org.opensearch.search.aggregations.metrics.NumericMetricsAggregation; - -/** - * AggregationResponseParser. - */ -@UtilityClass -public class OpenSearchAggregationResponseParser { - - /** - * Parse Aggregations as a list of field and value map. - * - * @param aggregations aggregations - * @return a list of field and value map - */ - public static List> parse(Aggregations aggregations) { - List aggregationList = aggregations.asList(); - ImmutableList.Builder> builder = new ImmutableList.Builder<>(); - Map noBucketMap = new HashMap<>(); - - for (Aggregation aggregation : aggregationList) { - if (aggregation instanceof CompositeAggregation) { - for (CompositeAggregation.Bucket bucket : - ((CompositeAggregation) aggregation).getBuckets()) { - builder.add(parse(bucket)); - } - } else { - noBucketMap.putAll(parseInternal(aggregation)); - } - - } - // Todo, there is no better way to difference the with/without bucket from aggregations result. - return noBucketMap.isEmpty() ? builder.build() : Collections.singletonList(noBucketMap); - } - - private static Map parse(CompositeAggregation.Bucket bucket) { - Map resultMap = new HashMap<>(); - // The NodeClient return InternalComposite - - // build pair - resultMap.putAll(bucket.getKey()); - - // build pair - for (Aggregation aggregation : bucket.getAggregations()) { - resultMap.putAll(parseInternal(aggregation)); - } - - return resultMap; - } - - private static Map parseInternal(Aggregation aggregation) { - Map resultMap = new HashMap<>(); - if (aggregation instanceof NumericMetricsAggregation.SingleValue) { - resultMap.put( - aggregation.getName(), - handleNanValue(((NumericMetricsAggregation.SingleValue) aggregation).value())); - } else if (aggregation instanceof Filter) { - // parse sub-aggregations for FilterAggregation response - List aggList = ((Filter) aggregation).getAggregations().asList(); - aggList.forEach(internalAgg -> { - Map intermediateMap = parseInternal(internalAgg); - resultMap.put(internalAgg.getName(), intermediateMap.get(internalAgg.getName())); - }); - } else { - throw new IllegalStateException("unsupported aggregation type " + aggregation.getType()); - } - return resultMap; - } - - @VisibleForTesting - protected static Object handleNanValue(double value) { - return Double.isNaN(value) ? null : value; - } -} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java index fc7421aec3..156490d93a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java @@ -103,7 +103,7 @@ public boolean isAggregationResponse() { */ public Iterator iterator() { if (isAggregationResponse()) { - return OpenSearchAggregationResponseParser.parse(aggregations).stream().map(entry -> { + return exprValueFactory.getParser().parse(aggregations).stream().map(entry -> { ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); for (Map.Entry value : entry.entrySet()) { builder.put(value.getKey(), exprValueFactory.construct(value.getKey(), value.getValue())); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/CompositeAggregationParser.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/CompositeAggregationParser.java new file mode 100644 index 0000000000..00e8a5154c --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/CompositeAggregationParser.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.opensearch.sql.opensearch.response.agg; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.opensearch.search.aggregations.Aggregations; +import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation; + +/** + * Composite Aggregation Parser which include composite aggregation and metric parsers. + */ +public class CompositeAggregationParser implements OpenSearchAggregationResponseParser { + + private final MetricParserHelper metricsParser; + + public CompositeAggregationParser(MetricParser... metricParserList) { + metricsParser = new MetricParserHelper(Arrays.asList(metricParserList)); + } + + public CompositeAggregationParser(List metricParserList) { + metricsParser = new MetricParserHelper(metricParserList); + } + + @Override + public List> parse(Aggregations aggregations) { + return ((CompositeAggregation) aggregations.asList().get(0)) + .getBuckets().stream().map(this::parse).collect(Collectors.toList()); + } + + private Map parse(CompositeAggregation.Bucket bucket) { + Map resultMap = new HashMap<>(); + resultMap.putAll(bucket.getKey()); + resultMap.putAll(metricsParser.parse(bucket.getAggregations())); + return resultMap; + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/FilterParser.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/FilterParser.java new file mode 100644 index 0000000000..cfcba82c18 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/FilterParser.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.opensearch.sql.opensearch.response.agg; + +import java.util.Map; +import lombok.Builder; +import lombok.Getter; +import org.opensearch.search.aggregations.Aggregation; +import org.opensearch.search.aggregations.bucket.filter.Filter; + +/** + * {@link Filter} Parser. + * The current use case is filter aggregation, e.g. avg(age) filter(balance>0). The filter parser + * do nothing and return the result from metricsParser. + */ +@Builder +public class FilterParser implements MetricParser { + + private final MetricParser metricsParser; + + @Getter private final String name; + + @Override + public Map parse(Aggregation aggregations) { + return metricsParser.parse(((Filter) aggregations).getAggregations().asList().get(0)); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/MetricParser.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/MetricParser.java new file mode 100644 index 0000000000..15f05e5b05 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/MetricParser.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.opensearch.sql.opensearch.response.agg; + +import java.util.Map; +import org.opensearch.search.aggregations.Aggregation; + +/** + * Metric Aggregation Parser. + */ +public interface MetricParser { + + /** + * Get the name of metric parser. + */ + String getName(); + + /** + * Parse the {@link Aggregation}. + * + * @param aggregation {@link Aggregation} + * @return the map between metric name and metric value. + */ + Map parse(Aggregation aggregation); +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/MetricParserHelper.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/MetricParserHelper.java new file mode 100644 index 0000000000..54b9305f49 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/MetricParserHelper.java @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.opensearch.sql.opensearch.response.agg; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import org.opensearch.search.aggregations.Aggregation; +import org.opensearch.search.aggregations.Aggregations; +import org.opensearch.sql.common.utils.StringUtils; + +/** + * Parse multiple metrics in one bucket. + */ +@RequiredArgsConstructor +public class MetricParserHelper { + + private final Map metricParserMap; + + public MetricParserHelper(List metricParserList) { + metricParserMap = + metricParserList.stream().collect(Collectors.toMap(MetricParser::getName, m -> m)); + } + + /** + * Parse {@link Aggregations}. + * + * @param aggregations {@link Aggregations} + * @return the map between metric name and metric value. + */ + public Map parse(Aggregations aggregations) { + Map resultMap = new HashMap<>(); + for (Aggregation aggregation : aggregations) { + if (metricParserMap.containsKey(aggregation.getName())) { + resultMap.putAll(metricParserMap.get(aggregation.getName()).parse(aggregation)); + } else { + throw new RuntimeException(StringUtils.format("couldn't parse field %s in aggregation " + + "response", aggregation.getName())); + } + } + return resultMap; + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/NoBucketAggregationParser.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/NoBucketAggregationParser.java new file mode 100644 index 0000000000..5756003523 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/NoBucketAggregationParser.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.opensearch.sql.opensearch.response.agg; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.opensearch.search.aggregations.Aggregations; + +/** + * No Bucket Aggregation Parser which include only metric parsers. + */ +public class NoBucketAggregationParser implements OpenSearchAggregationResponseParser { + + private final MetricParserHelper metricsParser; + + public NoBucketAggregationParser(MetricParser... metricParserList) { + metricsParser = new MetricParserHelper(Arrays.asList(metricParserList)); + } + + public NoBucketAggregationParser(List metricParserList) { + metricsParser = new MetricParserHelper(metricParserList); + } + + @Override + public List> parse(Aggregations aggregations) { + return Collections.singletonList(metricsParser.parse(aggregations)); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/OpenSearchAggregationResponseParser.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/OpenSearchAggregationResponseParser.java new file mode 100644 index 0000000000..3a19747ef3 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/OpenSearchAggregationResponseParser.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.opensearch.sql.opensearch.response.agg; + +import java.util.List; +import java.util.Map; +import org.opensearch.search.aggregations.Aggregations; + +/** + * OpenSearch Aggregation Response Parser. + */ +public interface OpenSearchAggregationResponseParser { + + /** + * Parse the OpenSearch Aggregation Response. + * @param aggregations Aggregations. + * @return aggregation result. + */ + List> parse(Aggregations aggregations); +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/SingleValueParser.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/SingleValueParser.java new file mode 100644 index 0000000000..7536a24661 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/SingleValueParser.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.opensearch.sql.opensearch.response.agg; + +import static org.opensearch.sql.opensearch.response.agg.Utils.handleNanValue; + +import java.util.Collections; +import java.util.Map; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.search.aggregations.Aggregation; +import org.opensearch.search.aggregations.metrics.NumericMetricsAggregation; + +/** + * {@link NumericMetricsAggregation.SingleValue} metric parser. + */ +@RequiredArgsConstructor +public class SingleValueParser implements MetricParser { + + @Getter private final String name; + + @Override + public Map parse(Aggregation agg) { + return Collections.singletonMap( + agg.getName(), + handleNanValue(((NumericMetricsAggregation.SingleValue) agg).value())); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/StatsParser.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/StatsParser.java new file mode 100644 index 0000000000..6cac2fbdc9 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/StatsParser.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.opensearch.sql.opensearch.response.agg; + +import static org.opensearch.sql.opensearch.response.agg.Utils.handleNanValue; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.search.aggregations.Aggregation; +import org.opensearch.search.aggregations.metrics.ExtendedStats; + +/** + * {@link ExtendedStats} metric parser. + */ +@RequiredArgsConstructor +public class StatsParser implements MetricParser { + + private final Function valueExtractor; + + @Getter private final String name; + + @Override + public Map parse(Aggregation agg) { + return Collections.singletonMap( + agg.getName(), handleNanValue(valueExtractor.apply((ExtendedStats) agg))); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/Utils.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/Utils.java new file mode 100644 index 0000000000..28b9d41e83 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/Utils.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.opensearch.sql.opensearch.response.agg; + +import lombok.experimental.UtilityClass; + +@UtilityClass +public class Utils { + /** + * Utils to handle Nan Value. + * @return null if is Nan. + */ + public static Object handleNanValue(double value) { + return Double.isNaN(value) ? null : value; + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index 74e966637f..0198abe7a1 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.tuple.Pair; import org.opensearch.index.query.QueryBuilder; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.sql.common.setting.Settings; @@ -43,6 +44,7 @@ import org.opensearch.sql.opensearch.planner.logical.OpenSearchLogicalIndexScan; import org.opensearch.sql.opensearch.planner.logical.OpenSearchLogicalPlanOptimizerFactory; import org.opensearch.sql.opensearch.request.system.OpenSearchDescribeIndexRequest; +import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser; import org.opensearch.sql.opensearch.storage.script.aggregation.AggregationQueryBuilder; import org.opensearch.sql.opensearch.storage.script.filter.FilterQueryBuilder; import org.opensearch.sql.opensearch.storage.script.sort.SortQueryBuilder; @@ -163,7 +165,7 @@ public PhysicalPlan visitIndexAggregation(OpenSearchLogicalIndexAgg node, } AggregationQueryBuilder builder = new AggregationQueryBuilder(new DefaultExpressionSerializer()); - List aggregationBuilder = + Pair, OpenSearchAggregationResponseParser> aggregationBuilder = builder.buildAggregationBuilder(node.getAggregatorList(), node.getGroupByList(), node.getSortList()); context.pushDownAggregation(aggregationBuilder); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java index 99b11c21a4..57980f23b9 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java @@ -40,6 +40,7 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; +import org.apache.commons.lang3.tuple.Pair; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; @@ -55,6 +56,7 @@ import org.opensearch.sql.opensearch.request.OpenSearchQueryRequest; import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; +import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser; import org.opensearch.sql.storage.TableScanOperator; /** @@ -138,12 +140,14 @@ public void pushDown(QueryBuilder query) { /** * Push down aggregation to DSL request. - * @param aggregationBuilderList aggregation query. + * @param aggregationBuilder pair of aggregation query and aggregation parser. */ - public void pushDownAggregation(List aggregationBuilderList) { + public void pushDownAggregation( + Pair, OpenSearchAggregationResponseParser> aggregationBuilder) { SearchSourceBuilder source = request.getSourceBuilder(); - aggregationBuilderList.forEach(aggregationBuilder -> source.aggregation(aggregationBuilder)); + aggregationBuilder.getLeft().forEach(builder -> source.aggregation(builder)); source.size(0); + request.getExprValueFactory().setParser(aggregationBuilder.getRight()); } /** diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/AggregationQueryBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/AggregationQueryBuilder.java index a89ba042ee..403f99e593 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/AggregationQueryBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/AggregationQueryBuilder.java @@ -42,6 +42,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.AggregationBuilders; +import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.sort.SortOrder; import org.opensearch.sql.ast.tree.Sort; import org.opensearch.sql.data.type.ExprType; @@ -50,6 +51,10 @@ import org.opensearch.sql.expression.NamedExpression; import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.expression.aggregation.NamedAggregator; +import org.opensearch.sql.opensearch.response.agg.CompositeAggregationParser; +import org.opensearch.sql.opensearch.response.agg.MetricParser; +import org.opensearch.sql.opensearch.response.agg.NoBucketAggregationParser; +import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser; import org.opensearch.sql.opensearch.storage.script.aggregation.dsl.BucketAggregationBuilder; import org.opensearch.sql.opensearch.storage.script.aggregation.dsl.MetricAggregationBuilder; import org.opensearch.sql.opensearch.storage.serialization.ExpressionSerializer; @@ -82,25 +87,35 @@ public AggregationQueryBuilder( this.metricBuilder = new MetricAggregationBuilder(serializer); } - /** - * Build AggregationBuilder. - */ - public List buildAggregationBuilder( - List namedAggregatorList, - List groupByList, - List> sortList) { + /** Build AggregationBuilder. */ + public Pair, OpenSearchAggregationResponseParser> + buildAggregationBuilder( + List namedAggregatorList, + List groupByList, + List> sortList) { + + final Pair> metrics = + metricBuilder.build(namedAggregatorList); + if (groupByList.isEmpty()) { // no bucket - return ImmutableList - .copyOf(metricBuilder.build(namedAggregatorList).getAggregatorFactories()); + return Pair.of( + ImmutableList.copyOf(metrics.getLeft().getAggregatorFactories()), + new NoBucketAggregationParser(metrics.getRight())); } else { - final GroupSortOrder groupSortOrder = new GroupSortOrder(sortList); - return Collections.singletonList(AggregationBuilders.composite("composite_buckets", - bucketBuilder - .build(groupByList.stream().sorted(groupSortOrder).map(expr -> Pair.of(expr, - groupSortOrder.apply(expr))).collect(Collectors.toList()))) - .subAggregations(metricBuilder.build(namedAggregatorList)) - .size(AGGREGATION_BUCKET_SIZE)); + GroupSortOrder groupSortOrder = new GroupSortOrder(sortList); + return Pair.of( + Collections.singletonList( + AggregationBuilders.composite( + "composite_buckets", + bucketBuilder.build( + groupByList.stream() + .sorted(groupSortOrder) + .map(expr -> Pair.of(expr, groupSortOrder.apply(expr))) + .collect(Collectors.toList()))) + .subAggregations(metrics.getLeft()) + .size(AGGREGATION_BUCKET_SIZE)), + new CompositeAggregationParser(metrics.getRight())); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/dsl/MetricAggregationBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/dsl/MetricAggregationBuilder.java index f3807ae662..0dbfec02c1 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/dsl/MetricAggregationBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/dsl/MetricAggregationBuilder.java @@ -30,7 +30,9 @@ import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; +import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang3.tuple.Pair; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.AggregationBuilders; import org.opensearch.search.aggregations.AggregatorFactories; @@ -41,20 +43,22 @@ import org.opensearch.sql.expression.LiteralExpression; import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.expression.aggregation.NamedAggregator; +import org.opensearch.sql.opensearch.response.agg.FilterParser; +import org.opensearch.sql.opensearch.response.agg.MetricParser; +import org.opensearch.sql.opensearch.response.agg.SingleValueParser; import org.opensearch.sql.opensearch.storage.script.filter.FilterQueryBuilder; import org.opensearch.sql.opensearch.storage.serialization.ExpressionSerializer; /** - * Build the Metric Aggregation from {@link NamedAggregator}. + * Build the Metric Aggregation and List of {@link MetricParser} from {@link NamedAggregator}. */ public class MetricAggregationBuilder - extends ExpressionNodeVisitor { + extends ExpressionNodeVisitor, Object> { private final AggregationBuilderHelper> helper; private final FilterQueryBuilder filterBuilder; - public MetricAggregationBuilder( - ExpressionSerializer serializer) { + public MetricAggregationBuilder(ExpressionSerializer serializer) { this.helper = new AggregationBuilderHelper<>(serializer); this.filterBuilder = new FilterQueryBuilder(serializer); } @@ -65,55 +69,89 @@ public MetricAggregationBuilder( * @param aggregatorList aggregator list * @return AggregatorFactories.Builder */ - public AggregatorFactories.Builder build(List aggregatorList) { + public Pair> build( + List aggregatorList) { AggregatorFactories.Builder builder = new AggregatorFactories.Builder(); + List metricParserList = new ArrayList<>(); for (NamedAggregator aggregator : aggregatorList) { - builder.addAggregator(aggregator.accept(this, null)); + Pair pair = aggregator.accept(this, null); + builder.addAggregator(pair.getLeft()); + metricParserList.add(pair.getRight()); } - return builder; + return Pair.of(builder, metricParserList); } @Override - public AggregationBuilder visitNamedAggregator(NamedAggregator node, - Object context) { + public Pair visitNamedAggregator( + NamedAggregator node, Object context) { Expression expression = node.getArguments().get(0); Expression condition = node.getDelegated().condition(); String name = node.getName(); switch (node.getFunctionName().getFunctionName()) { case "avg": - return make(AggregationBuilders.avg(name), expression, condition, name); + return make( + AggregationBuilders.avg(name), + expression, + condition, + name, + new SingleValueParser(name)); case "sum": - return make(AggregationBuilders.sum(name), expression, condition, name); + return make( + AggregationBuilders.sum(name), + expression, + condition, + name, + new SingleValueParser(name)); case "count": return make( - AggregationBuilders.count(name), replaceStarOrLiteral(expression), condition, name); + AggregationBuilders.count(name), + replaceStarOrLiteral(expression), + condition, + name, + new SingleValueParser(name)); case "min": - return make(AggregationBuilders.min(name), expression, condition, name); + return make( + AggregationBuilders.min(name), + expression, + condition, + name, + new SingleValueParser(name)); case "max": - return make(AggregationBuilders.max(name), expression, condition, name); + return make( + AggregationBuilders.max(name), + expression, + condition, + name, + new SingleValueParser(name)); default: throw new IllegalStateException( String.format("unsupported aggregator %s", node.getFunctionName().getFunctionName())); } } - private AggregationBuilder make(ValuesSourceAggregationBuilder builder, - Expression expression, Expression condition, String name) { + private Pair make( + ValuesSourceAggregationBuilder builder, + Expression expression, + Expression condition, + String name, + MetricParser parser) { ValuesSourceAggregationBuilder aggregationBuilder = helper.build(expression, builder::field, builder::script); if (condition != null) { - return makeFilterAggregation(aggregationBuilder, condition, name); + return Pair.of( + makeFilterAggregation(aggregationBuilder, condition, name), + FilterParser.builder().name(name).metricsParser(parser).build()); } - return aggregationBuilder; + return Pair.of(aggregationBuilder, parser); } /** - * Replace star or literal with OpenSearch metadata field "_index". Because: - * 1) Analyzer already converts * to string literal, literal check here can handle - * both COUNT(*) and COUNT(1). - * 2) Value count aggregation on _index counts all docs (after filter), therefore - * it has same semantics as COUNT(*) or COUNT(1). + * Replace star or literal with OpenSearch metadata field "_index". Because: 1) Analyzer already + * converts * to string literal, literal check here can handle both COUNT(*) and COUNT(1). 2) + * Value count aggregation on _index counts all docs (after filter), therefore it has same + * semantics as COUNT(*) or COUNT(1). + * * @param countArg count function argument * @return Reference to _index if literal, otherwise return original argument expression */ @@ -126,16 +164,15 @@ private Expression replaceStarOrLiteral(Expression countArg) { /** * Make builder to build FilterAggregation for aggregations with filter in the bucket. + * * @param subAggBuilder AggregationBuilder instance which the filter is applied to. * @param condition Condition expression in the filter. * @param name Name of the FilterAggregation instance to build. * @return {@link FilterAggregationBuilder}. */ - private FilterAggregationBuilder makeFilterAggregation(AggregationBuilder subAggBuilder, - Expression condition, String name) { - return AggregationBuilders - .filter(name, filterBuilder.build(condition)) + private FilterAggregationBuilder makeFilterAggregation( + AggregationBuilder subAggBuilder, Expression condition, String name) { + return AggregationBuilders.filter(name, filterBuilder.build(condition)) .subAggregation(subAggBuilder); } - } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/response/AggregationResponseUtils.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/response/AggregationResponseUtils.java index c8ef830635..173b33575c 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/response/AggregationResponseUtils.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/response/AggregationResponseUtils.java @@ -55,9 +55,11 @@ import org.opensearch.search.aggregations.bucket.terms.ParsedStringTerms; import org.opensearch.search.aggregations.bucket.terms.StringTerms; import org.opensearch.search.aggregations.metrics.AvgAggregationBuilder; +import org.opensearch.search.aggregations.metrics.ExtendedStatsAggregationBuilder; import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder; import org.opensearch.search.aggregations.metrics.MinAggregationBuilder; import org.opensearch.search.aggregations.metrics.ParsedAvg; +import org.opensearch.search.aggregations.metrics.ParsedExtendedStats; import org.opensearch.search.aggregations.metrics.ParsedMax; import org.opensearch.search.aggregations.metrics.ParsedMin; import org.opensearch.search.aggregations.metrics.ParsedSum; @@ -74,6 +76,8 @@ public class AggregationResponseUtils { .put(MaxAggregationBuilder.NAME, (p, c) -> ParsedMax.fromXContent(p, (String) c)) .put(SumAggregationBuilder.NAME, (p, c) -> ParsedSum.fromXContent(p, (String) c)) .put(AvgAggregationBuilder.NAME, (p, c) -> ParsedAvg.fromXContent(p, (String) c)) + .put(ExtendedStatsAggregationBuilder.NAME, + (p, c) -> ParsedExtendedStats.fromXContent(p, (String) c)) .put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c)) .put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c)) .put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c)) diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchAggregationResponseParserTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchAggregationResponseParserTest.java index b49bec4d44..120d48b601 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchAggregationResponseParserTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchAggregationResponseParserTest.java @@ -34,6 +34,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.opensearch.sql.opensearch.response.AggregationResponseUtils.fromJson; +import static org.opensearch.sql.opensearch.response.agg.Utils.handleNanValue; import com.google.common.collect.ImmutableMap; import java.util.List; @@ -41,6 +43,13 @@ import org.junit.jupiter.api.DisplayNameGeneration; import org.junit.jupiter.api.DisplayNameGenerator; import org.junit.jupiter.api.Test; +import org.opensearch.search.aggregations.metrics.ExtendedStats; +import org.opensearch.sql.opensearch.response.agg.CompositeAggregationParser; +import org.opensearch.sql.opensearch.response.agg.FilterParser; +import org.opensearch.sql.opensearch.response.agg.NoBucketAggregationParser; +import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser; +import org.opensearch.sql.opensearch.response.agg.SingleValueParser; +import org.opensearch.sql.opensearch.response.agg.StatsParser; @DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) class OpenSearchAggregationResponseParserTest { @@ -55,7 +64,10 @@ void no_bucket_one_metric_should_pass() { + " \"value\": 40\n" + " }\n" + "}"; - assertThat(parse(response), contains(entry("max", 40d))); + NoBucketAggregationParser parser = new NoBucketAggregationParser( + new SingleValueParser("max") + ); + assertThat(parse(parser, response), contains(entry("max", 40d))); } /** @@ -71,7 +83,11 @@ void no_bucket_two_metric_should_pass() { + " \"value\": 20\n" + " }\n" + "}"; - assertThat(parse(response), + NoBucketAggregationParser parser = new NoBucketAggregationParser( + new SingleValueParser("max"), + new SingleValueParser("min") + ); + assertThat(parse(parser, response), contains(entry("max", 40d,"min", 20d))); } @@ -104,7 +120,10 @@ void one_bucket_one_metric_should_pass() { + " ]\n" + " }\n" + "}"; - assertThat(parse(response), + + OpenSearchAggregationResponseParser parser = new CompositeAggregationParser( + new SingleValueParser("avg")); + assertThat(parse(parser, response), containsInAnyOrder(ImmutableMap.of("type", "cost", "avg", 20d), ImmutableMap.of("type", "sale", "avg", 105d))); } @@ -139,7 +158,9 @@ void two_bucket_one_metric_should_pass() { + " ]\n" + " }\n" + "}"; - assertThat(parse(response), + OpenSearchAggregationResponseParser parser = new CompositeAggregationParser( + new SingleValueParser("avg")); + assertThat(parse(parser, response), containsInAnyOrder(ImmutableMap.of("type", "cost", "region", "us", "avg", 20d), ImmutableMap.of("type", "sale", "region", "uk", "avg", 130d))); } @@ -147,81 +168,132 @@ void two_bucket_one_metric_should_pass() { @Test void unsupported_aggregation_should_fail() { String response = "{\n" - + " \"date_histogram#max\": {\n" + + " \"date_histogram#date_histogram\": {\n" + " \"value\": 40\n" + " }\n" + "}"; - IllegalStateException exception = - assertThrows(IllegalStateException.class, () -> parse(response)); - assertEquals("unsupported aggregation type date_histogram", exception.getMessage()); + NoBucketAggregationParser parser = new NoBucketAggregationParser( + new SingleValueParser("max") + ); + RuntimeException exception = + assertThrows(RuntimeException.class, () -> parse(parser, response)); + assertEquals( + "couldn't parse field date_histogram in aggregation response", exception.getMessage()); } @Test void nan_value_should_return_null() { - assertNull(OpenSearchAggregationResponseParser.handleNanValue(Double.NaN)); + assertNull(handleNanValue(Double.NaN)); } - /** - * SELECT AVG(age) FILTER(WHERE age > 37) as filtered FROM accounts. - */ @Test void filter_aggregation_should_pass() { - String response = "{\n" - + " \"filter#filtered\" : {\n" - + " \"doc_count\" : 3,\n" - + " \"avg#filtered\" : {\n" - + " \"value\" : 37.0\n" - + " }\n" - + " }\n" - + " }"; - assertThat(parse(response), contains(entry("filtered", 37.0))); + String response = "{\n" + + " \"filter#filtered\" : {\n" + + " \"doc_count\" : 3,\n" + + " \"avg#filtered\" : {\n" + + " \"value\" : 37.0\n" + + " }\n" + + " }\n" + + " }"; + OpenSearchAggregationResponseParser parser = + new NoBucketAggregationParser( + FilterParser.builder() + .name("filtered") + .metricsParser(new SingleValueParser("filtered")) + .build()); + assertThat(parse(parser, response), contains(entry("filtered", 37.0))); } - /** - * SELECT AVG(age) FILTER(WHERE age > 37) as filtered FROM accounts GROUP BY gender. - */ @Test void filter_aggregation_group_by_should_pass() { - String response = "{\n" - + " \"composite#composite_buckets\":{\n" - + " \"after_key\":{\n" - + " \"gender\":\"m\"\n" - + " },\n" - + " \"buckets\":[\n" - + " {\n" - + " \"key\":{\n" - + " \"gender\":\"f\"\n" - + " },\n" - + " \"doc_count\":3,\n" - + " \"filter#filter\":{\n" - + " \"doc_count\":1,\n" - + " \"avg#avg\":{\n" - + " \"value\":39.0\n" - + " }\n" - + " }\n" - + " },\n" - + " {\n" - + " \"key\":{\n" - + " \"gender\":\"m\"\n" - + " },\n" - + " \"doc_count\":4,\n" - + " \"filter#filter\":{\n" - + " \"doc_count\":2,\n" - + " \"avg#avg\":{\n" - + " \"value\":36.0\n" - + " }\n" - + " }\n" - + " }\n" - + " ]\n" - + " }\n" - + "}"; - assertThat(parse(response), containsInAnyOrder( + String response = "{\n" + + " \"composite#composite_buckets\":{\n" + + " \"after_key\":{\n" + + " \"gender\":\"m\"\n" + + " },\n" + + " \"buckets\":[\n" + + " {\n" + + " \"key\":{\n" + + " \"gender\":\"f\"\n" + + " },\n" + + " \"doc_count\":3,\n" + + " \"filter#filter\":{\n" + + " \"doc_count\":1,\n" + + " \"avg#avg\":{\n" + + " \"value\":39.0\n" + + " }\n" + + " }\n" + + " },\n" + + " {\n" + + " \"key\":{\n" + + " \"gender\":\"m\"\n" + + " },\n" + + " \"doc_count\":4,\n" + + " \"filter#filter\":{\n" + + " \"doc_count\":2,\n" + + " \"avg#avg\":{\n" + + " \"value\":36.0\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + "}"; + OpenSearchAggregationResponseParser parser = new CompositeAggregationParser( + FilterParser.builder() + .name("filter") + .metricsParser(new SingleValueParser("avg")) + .build() + ); + assertThat(parse(parser, response), containsInAnyOrder( entry("gender", "f", "avg", 39.0), entry("gender", "m", "avg", 36.0))); } - public List> parse(String json) { - return OpenSearchAggregationResponseParser.parse(AggregationResponseUtils.fromJson(json)); + /** + * SELECT MAX(age) as max, STDDEV(age) as min FROM accounts. + */ + @Test + void no_bucket_max_and_extended_stats() { + String response = "{\n" + + " \"extended_stats#esField\": {\n" + + " \"count\": 2033,\n" + + " \"min\": 0,\n" + + " \"max\": 360,\n" + + " \"avg\": 45.47958681751107,\n" + + " \"sum\": 92460,\n" + + " \"sum_of_squares\": 22059450,\n" + + " \"variance\": 8782.295820390027,\n" + + " \"variance_population\": 8782.295820390027,\n" + + " \"variance_sampling\": 8786.61781636463,\n" + + " \"std_deviation\": 93.71390409320287,\n" + + " \"std_deviation_population\": 93.71390409320287,\n" + + " \"std_deviation_sampling\": 93.73696078049805,\n" + + " \"std_deviation_bounds\": {\n" + + " \"upper\": 232.9073950039168,\n" + + " \"lower\": -141.94822136889468,\n" + + " \"upper_population\": 232.9073950039168,\n" + + " \"lower_population\": -141.94822136889468,\n" + + " \"upper_sampling\": 232.95350837850717,\n" + + " \"lower_sampling\": -141.99433474348504\n" + + " }\n" + + " },\n" + + " \"max#maxField\": {\n" + + " \"value\": 360\n" + + " }\n" + + "}"; + + NoBucketAggregationParser parser = new NoBucketAggregationParser( + new SingleValueParser("maxField"), + new StatsParser(ExtendedStats::getStdDeviation, "esField") + ); + assertThat(parse(parser, response), + contains(entry("esField", 93.71390409320287, "maxField", 360D))); + } + + public List> parse(OpenSearchAggregationResponseParser parser, String json) { + return parser.parse(fromJson(json)); } public Map entry(String name, Object value) { diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchResponseTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchResponseTest.java index 184312afa1..c9cde4f634 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchResponseTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchResponseTest.java @@ -42,8 +42,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; -import org.mockito.MockedStatic; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.search.SearchResponse; import org.opensearch.search.SearchHit; @@ -53,6 +51,7 @@ import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; +import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser; @ExtendWith(MockitoExtension.class) class OpenSearchResponseTest { @@ -72,6 +71,9 @@ class OpenSearchResponseTest { @Mock private Aggregations aggregations; + @Mock + private OpenSearchAggregationResponseParser parser; + private ExprTupleValue exprTupleValue1 = ExprTupleValue.fromExprValueMap(ImmutableMap.of("id1", new ExprIntegerValue(1))); @@ -147,26 +149,24 @@ void response_isnot_aggregation_when_aggregation_is_empty() { @Test void aggregation_iterator() { - try ( - MockedStatic mockedStatic = Mockito - .mockStatic(OpenSearchAggregationResponseParser.class)) { - when(OpenSearchAggregationResponseParser.parse(any())) - .thenReturn(Arrays.asList(ImmutableMap.of("id1", 1), ImmutableMap.of("id2", 2))); - when(searchResponse.getAggregations()).thenReturn(aggregations); - when(factory.construct(anyString(), any())).thenReturn(new ExprIntegerValue(1)) - .thenReturn(new ExprIntegerValue(2)); - - int i = 0; - for (ExprValue hit : new OpenSearchResponse(searchResponse, factory)) { - if (i == 0) { - assertEquals(exprTupleValue1, hit); - } else if (i == 1) { - assertEquals(exprTupleValue2, hit); - } else { - fail("More search hits returned than expected"); - } - i++; + when(parser.parse(any())) + .thenReturn(Arrays.asList(ImmutableMap.of("id1", 1), ImmutableMap.of("id2", 2))); + when(searchResponse.getAggregations()).thenReturn(aggregations); + when(factory.getParser()).thenReturn(parser); + when(factory.construct(anyString(), any())) + .thenReturn(new ExprIntegerValue(1)) + .thenReturn(new ExprIntegerValue(2)); + + int i = 0; + for (ExprValue hit : new OpenSearchResponse(searchResponse, factory)) { + if (i == 0) { + assertEquals(exprTupleValue1, hit); + } else if (i == 1) { + assertEquals(exprTupleValue2, hit); + } else { + fail("More search hits returned than expected"); } + i++; } } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/script/aggregation/AggregationQueryBuilderTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/script/aggregation/AggregationQueryBuilderTest.java index 2242298bed..62643baad2 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/script/aggregation/AggregationQueryBuilderTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/script/aggregation/AggregationQueryBuilderTest.java @@ -423,13 +423,18 @@ private String buildQuery(List namedAggregatorList, } @SneakyThrows - private String buildQuery(List namedAggregatorList, - List groupByList, - List> sortList) { + private String buildQuery( + List namedAggregatorList, + List groupByList, + List> sortList) { ObjectMapper objectMapper = new ObjectMapper(); - return objectMapper.readTree( - queryBuilder.buildAggregationBuilder(namedAggregatorList, groupByList, sortList).get(0) - .toString()) + return objectMapper + .readTree( + queryBuilder + .buildAggregationBuilder(namedAggregatorList, groupByList, sortList) + .getLeft() + .get(0) + .toString()) .toPrettyString(); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/script/aggregation/dsl/MetricAggregationBuilderTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/script/aggregation/dsl/MetricAggregationBuilderTest.java index b956a2f5a0..85b3bd5a65 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/script/aggregation/dsl/MetricAggregationBuilderTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/script/aggregation/dsl/MetricAggregationBuilderTest.java @@ -211,7 +211,7 @@ void should_throw_exception_for_unsupported_exception() { private String buildQuery(List namedAggregatorList) { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readTree( - aggregationBuilder.build(namedAggregatorList).toString()) + aggregationBuilder.build(namedAggregatorList).getLeft().toString()) .toPrettyString(); } } From ba965b9ce764e22c82000e19b3c112c8348dd9f5 Mon Sep 17 00:00:00 2001 From: penghuo Date: Fri, 4 Jun 2021 15:47:41 -0700 Subject: [PATCH 2/2] modify the doc Signed-off-by: penghuo --- .../java/org/opensearch/sql/opensearch/response/agg/Utils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/Utils.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/Utils.java index 28b9d41e83..53fd66ceef 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/Utils.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/Utils.java @@ -19,7 +19,7 @@ public class Utils { /** * Utils to handle Nan Value. - * @return null if is Nan. + * @return null if is Nan value. */ public static Object handleNanValue(double value) { return Double.isNaN(value) ? null : value;