diff --git a/common/src/main/java/io/druid/collections/SerializablePair.java b/common/src/main/java/io/druid/collections/SerializablePair.java new file mode 100644 index 000000000000..d02108ca7b73 --- /dev/null +++ b/common/src/main/java/io/druid/collections/SerializablePair.java @@ -0,0 +1,45 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.collections; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.java.util.common.Pair; + +public class SerializablePair extends Pair +{ + @JsonCreator + public SerializablePair(@JsonProperty("lhs") T1 lhs, @JsonProperty("rhs") T2 rhs) + { + super(lhs, rhs); + } + + @JsonProperty + public T1 getLhs() + { + return lhs; + } + + @JsonProperty + public T2 getRhs() + { + return rhs; + } +} diff --git a/common/src/test/java/io/druid/collections/SerializablePairTest.java b/common/src/test/java/io/druid/collections/SerializablePairTest.java new file mode 100644 index 000000000000..94586820862f --- /dev/null +++ b/common/src/test/java/io/druid/collections/SerializablePairTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.collections; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class SerializablePairTest +{ + private static final ObjectMapper jsonMapper = new ObjectMapper(); + + @Test + public void testBytesSerde() throws IOException + { + SerializablePair pair = new SerializablePair<>(5L, 9L); + byte[] bytes = jsonMapper.writeValueAsBytes(pair); + SerializablePair deserializedPair = jsonMapper.readValue(bytes, SerializablePair.class); + Assert.assertEquals(pair.lhs, deserializedPair.lhs.longValue()); + Assert.assertEquals(pair.rhs, deserializedPair.rhs.longValue()); + } + + @Test + public void testStringSerde() throws IOException + { + SerializablePair pair = new SerializablePair<>(5L, 9L); + String str = jsonMapper.writeValueAsString(pair); + SerializablePair deserializedPair = jsonMapper.readValue(str, SerializablePair.class); + Assert.assertEquals(pair.lhs, deserializedPair.lhs.longValue()); + Assert.assertEquals(pair.rhs, deserializedPair.rhs.longValue()); + } +} diff --git a/docs/content/querying/aggregations.md b/docs/content/querying/aggregations.md index 507f9da12cef..2382e76aa2b2 100644 --- a/docs/content/querying/aggregations.md +++ b/docs/content/querying/aggregations.md @@ -76,6 +76,60 @@ Computes the sum of values as 64-bit floating point value. Similar to `longSum` { "type" : "longMax", "name" : , "fieldName" : } ``` +### First / Last aggregator + +First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries. + +Note that queries with first/last aggregators on a segment created with rollup enabled will return the rolled up value, and not the last value within the raw ingested data. + +#### `doubleFirst` aggregator + +`doubleFirst` computes the metric value with the minimum timestamp or 0 if no row exist + +```json +{ + "type" : "doubleFirst", + "name" : , + "fieldName" : +} +``` + +#### `doubleLast` aggregator + +`doubleLast` computes the metric value with the maximum timestamp or 0 if no row exist + +```json +{ + "type" : "doubleLast", + "name" : , + "fieldName" : +} +``` + +#### `longFirst` aggregator + +`longFirst` computes the metric value with the minimum timestamp or 0 if no row exist + +```json +{ + "type" : "longFirst", + "name" : , + "fieldName" : +} +``` + +#### `longLast` aggregator + +`longLast` computes the metric value with the maximum timestamp or 0 if no row exist + +```json +{ + "type" : "longLast", + "name" : , + "fieldName" : , +} +``` + ### JavaScript aggregator Computes an arbitrary JavaScript function over a set of columns (both metrics and dimensions are allowed). Your diff --git a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json index 5873a87d95b1..791d296d2500 100644 --- a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json +++ b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json @@ -73,6 +73,26 @@ "type": "hyperUnique", "fieldName": "unique_users", "name": "unique_users" + }, + { + "type" : "doubleFirst", + "name" : "firstAdded", + "fieldName" : "added" + }, + { + "type" : "doubleLast", + "name" : "lastAdded", + "fieldName" : "added" + }, + { + "type" : "longFirst", + "name" : "firstCount", + "fieldName" : "count" + }, + { + "type" : "longLast", + "name" : "lastCount", + "fieldName" : "count" } ], "context": { @@ -87,6 +107,10 @@ "result": { "added": 9.11526338E8, "count": 2815650, + "firstAdded": 39.0, + "lastAdded": 210.0, + "firstCount": 1, + "lastCount": 1, "delta": 5.48967603E8, "variation": 1.274085073E9, "delta_hist": { @@ -174,6 +198,26 @@ "type": "hyperUnique", "fieldName": "unique_users", "name": "unique_users" + }, + { + "type" : "doubleFirst", + "name" : "firstAdded", + "fieldName" : "added" + }, + { + "type" : "doubleLast", + "name" : "lastAdded", + "fieldName" : "added" + }, + { + "type" : "longFirst", + "name" : "firstCount", + "fieldName" : "count" + }, + { + "type" : "longLast", + "name" : "lastCount", + "fieldName" : "count" } ], "context": { @@ -188,6 +232,10 @@ "result": { "added": 3.49393993E8, "count": 1829240, + "firstAdded": 39.0, + "lastAdded": 210.0, + "firstCount": 1, + "lastCount": 1, "delta": 2.24089868E8, "variation": 4.74698118E8, "delta_hist": { @@ -365,6 +413,26 @@ "type": "hyperUnique", "fieldName": "unique_users", "name": "unique_users" + }, + { + "type" : "doubleFirst", + "name" : "firstAdded", + "fieldName" : "added" + }, + { + "type" : "doubleLast", + "name" : "lastAdded", + "fieldName" : "added" + }, + { + "type" : "longFirst", + "name" : "firstCount", + "fieldName" : "count" + }, + { + "type" : "longLast", + "name" : "lastCount", + "fieldName" : "count" } ], "dimension": "page", @@ -383,6 +451,10 @@ { "added": 1812960.0, "count": 1697, + "firstCount": 2, + "lastCount": 3, + "firstAdded": 462.0, + "lastAdded": 1871.0, "page": "Wikipedia:Administrators'_noticeboard/Incidents", "delta": 770071.0, "variation": 2855849.0, @@ -393,6 +465,10 @@ { "added": 70162.0, "count": 967, + "firstCount": 1, + "lastCount": 1, + "firstAdded": 12.0, + "lastAdded": 129.0, "page": "2013", "delta": 40872.0, "variation": 99452.0, @@ -403,6 +479,10 @@ { "added": 519152.0, "count": 1700, + "firstCount": 1, + "lastCount": 5, + "firstAdded": 0.0, + "lastAdded": 2399.0, "page": "Wikipedia:Vandalismusmeldung", "delta": -5446.0, "variation": 1043750.0, @@ -480,6 +560,26 @@ "type": "hyperUnique", "fieldName": "unique_users", "name": "unique_users" + }, + { + "type" : "doubleFirst", + "name" : "firstAdded", + "fieldName" : "added" + }, + { + "type" : "doubleLast", + "name" : "lastAdded", + "fieldName" : "added" + }, + { + "type" : "longFirst", + "name" : "firstCount", + "fieldName" : "count" + }, + { + "type" : "longLast", + "name" : "lastCount", + "fieldName" : "count" } ], "dimension": "page", @@ -498,6 +598,10 @@ { "added": 61739.0, "count": 852, + "firstCount": 1, + "lastCount": 1, + "firstAdded": 12.0, + "lastAdded": 129.0, "page": "2013", "delta": 35313.0, "variation": 88165.0, @@ -508,6 +612,10 @@ { "added": 28288.0, "count": 513, + "firstCount": 1, + "lastCount": 1, + "firstAdded": 29.0, + "lastAdded": 37.0, "page": "Gérard_Depardieu", "delta": 7027.0, "variation": 49549.0, @@ -518,6 +626,10 @@ { "added": 10951.0, "count": 459, + "firstCount": 1, + "lastCount": 1, + "firstAdded": 29.0, + "lastAdded": 35.0, "page": "Zichyújfalu", "delta": 9030.0, "variation": 12872.0, @@ -570,6 +682,26 @@ "type": "hyperUnique", "fieldName": "unique_users", "name": "unique_users" + }, + { + "type" : "doubleFirst", + "name" : "firstAdded", + "fieldName" : "added" + }, + { + "type" : "doubleLast", + "name" : "lastAdded", + "fieldName" : "added" + }, + { + "type" : "longFirst", + "name" : "firstCount", + "fieldName" : "count" + }, + { + "type" : "longLast", + "name" : "lastCount", + "fieldName" : "count" } ], "postAggregations": [ @@ -619,6 +751,10 @@ { "added": 151409.0, "count": 1770, + "firstCount": 9, + "lastCount": 9, + "firstAdded": 1612.0, + "lastAdded": 560.0, "page": "User:Cyde/List_of_candidates_for_speedy_deletion/Subpage", "delta": 670.0, "variation": 302148.0, @@ -630,6 +766,10 @@ { "added": 519152.0, "count": 1700, + "firstCount": 1, + "lastCount": 5, + "firstAdded": 0.0, + "lastAdded": 2399.0, "page": "Wikipedia:Vandalismusmeldung", "delta": -5446.0, "variation": 1043750.0, @@ -641,6 +781,10 @@ { "added": 1812960.0, "count": 1697, + "firstCount": 2, + "lastCount": 3, + "firstAdded": 462.0, + "lastAdded": 1871.0, "page": "Wikipedia:Administrators'_noticeboard/Incidents", "delta": 770071.0, "variation": 2855849.0, @@ -865,7 +1009,7 @@ ] }, { - "description": "groupBy, two aggs, namespace + robot dim, postAggs", + "description": "groupBy, six aggs, namespace + robot dim, postAggs", "query": { "queryType": "groupBy", "dataSource": "wikipedia_editstream", @@ -880,6 +1024,26 @@ "type": "longSum", "fieldName": "count", "name": "count" + }, + { + "type" : "doubleFirst", + "name" : "firstAdded", + "fieldName" : "added" + }, + { + "type" : "doubleLast", + "name" : "lastAdded", + "fieldName" : "added" + }, + { + "type" : "longFirst", + "name" : "firstCount", + "fieldName" : "count" + }, + { + "type" : "longLast", + "name" : "lastCount", + "fieldName" : "count" } ], "postAggregations": [ @@ -920,6 +1084,10 @@ "event": { "sumOfRowsAndCount": 2268154.0, "count": 1286354, + "firstCount": 1, + "lastCount": 1, + "firstAdded": 70.0, + "lastAdded": 210.0, "robot": "0", "rows": 981800, "namespace": "article" @@ -931,6 +1099,10 @@ "event": { "sumOfRowsAndCount": 1385233.0, "count": 693711, + "firstCount": 1, + "lastCount": 1, + "firstAdded": 39.0, + "lastAdded": 0.0, "robot": "1", "rows": 691522, "namespace": "article" @@ -942,6 +1114,10 @@ "event": { "sumOfRowsAndCount": 878393.0, "count": 492643, + "firstCount": 2, + "lastCount": 1, + "firstAdded": 431.0, + "lastAdded": 43.0, "robot": "0", "rows": 385750, "namespace": "wikipedia" diff --git a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java index 353f9ddc5cc3..57fc798a6f40 100644 --- a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java +++ b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java @@ -29,8 +29,11 @@ import io.druid.query.aggregation.DoubleMinAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory; +import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import io.druid.query.aggregation.HistogramAggregatorFactory; import io.druid.query.aggregation.JavaScriptAggregatorFactory; +import io.druid.query.aggregation.first.LongFirstAggregatorFactory; +import io.druid.query.aggregation.last.DoubleLastAggregatorFactory; import io.druid.query.aggregation.LongMaxAggregatorFactory; import io.druid.query.aggregation.LongMinAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -39,6 +42,7 @@ import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import io.druid.query.aggregation.last.LongLastAggregatorFactory; import io.druid.query.aggregation.post.ArithmeticPostAggregator; import io.druid.query.aggregation.post.ConstantPostAggregator; import io.druid.query.aggregation.post.DoubleGreatestPostAggregator; @@ -79,7 +83,11 @@ public AggregatorsModule() @JsonSubTypes.Type(name = "histogram", value = HistogramAggregatorFactory.class), @JsonSubTypes.Type(name = "hyperUnique", value = HyperUniquesAggregatorFactory.class), @JsonSubTypes.Type(name = "cardinality", value = CardinalityAggregatorFactory.class), - @JsonSubTypes.Type(name = "filtered", value = FilteredAggregatorFactory.class) + @JsonSubTypes.Type(name = "filtered", value = FilteredAggregatorFactory.class), + @JsonSubTypes.Type(name = "longFirst", value = LongFirstAggregatorFactory.class), + @JsonSubTypes.Type(name = "doubleFirst", value = DoubleFirstAggregatorFactory.class), + @JsonSubTypes.Type(name = "longLast", value = LongLastAggregatorFactory.class), + @JsonSubTypes.Type(name = "doubleLast", value = DoubleLastAggregatorFactory.class) }) public static interface AggregatorFactoryMixin { diff --git a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java index f1e710193751..f299c12ee864 100644 --- a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java @@ -87,7 +87,8 @@ public Sequence run(final Query queryParam, final Map resp final Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( query, querySpecificConfig, - bufferPool + bufferPool, + true ); final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); final boolean bySegment = BaseQuery.getContextBySegment(query, false); diff --git a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java index ff835abaaac9..288e8a23789c 100644 --- a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java @@ -171,7 +171,7 @@ public List getRequiredColumns() @Override public AggregatorFactory apply(String input) { - return new JavaScriptAggregatorFactory(input, fieldNames, fnAggregate, fnReset, fnCombine, config); + return new JavaScriptAggregatorFactory(input, Lists.newArrayList(input), fnCombine, fnReset, fnCombine, config); } } ) diff --git a/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregator.java b/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregator.java new file mode 100644 index 000000000000..bc709de39138 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregator.java @@ -0,0 +1,97 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import io.druid.collections.SerializablePair; +import io.druid.query.aggregation.Aggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.LongColumnSelector; + +public class DoubleFirstAggregator implements Aggregator +{ + + private final FloatColumnSelector valueSelector; + private final LongColumnSelector timeSelector; + private final String name; + + protected long firstTime; + protected double firstValue; + + public DoubleFirstAggregator( + String name, + LongColumnSelector timeSelector, + FloatColumnSelector valueSelector + ) + { + this.name = name; + this.valueSelector = valueSelector; + this.timeSelector = timeSelector; + + reset(); + } + + @Override + public void aggregate() + { + long time = timeSelector.get(); + if (time < firstTime) { + firstTime = time; + firstValue = valueSelector.get(); + } + } + + @Override + public void reset() + { + firstTime = Long.MAX_VALUE; + firstValue = 0; + } + + @Override + public Object get() + { + return new SerializablePair<>(firstTime, firstValue); + } + + @Override + public float getFloat() + { + return (float) firstValue; + } + + @Override + public String getName() + { + return name; + } + + @Override + public void close() + { + + } + + @Override + public long getLong() + { + return (long) firstValue; + } +} + diff --git a/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java new file mode 100644 index 000000000000..192f8c363301 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java @@ -0,0 +1,265 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Longs; +import com.metamx.common.StringUtils; +import io.druid.collections.SerializablePair; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.column.Column; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +public class DoubleFirstAggregatorFactory extends AggregatorFactory +{ + public static final Comparator VALUE_COMPARATOR = new Comparator() + { + @Override + public int compare(Object o1, Object o2) + { + return Doubles.compare(((SerializablePair) o1).rhs, ((SerializablePair) o2).rhs); + } + }; + + public static final Comparator TIME_COMPARATOR = new Comparator() + { + @Override + public int compare(Object o1, Object o2) + { + return Longs.compare(((SerializablePair) o1).lhs, ((SerializablePair) o2).lhs); + } + }; + + private static final byte CACHE_TYPE_ID = 16; + + private final String fieldName; + private final String name; + + @JsonCreator + public DoubleFirstAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") final String fieldName + ) + { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + + this.name = name; + this.fieldName = fieldName; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + return new DoubleFirstAggregator( + name, + metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), + metricFactory.makeFloatColumnSelector(fieldName) + ); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + return new DoubleFirstBufferAggregator( + metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), + metricFactory.makeFloatColumnSelector(fieldName) + ); + } + + @Override + public Comparator getComparator() + { + return VALUE_COMPARATOR; + } + + @Override + public Object combine(Object lhs, Object rhs) + { + return TIME_COMPARATOR.compare(lhs, rhs) <= 0 ? lhs : rhs; + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new DoubleFirstAggregatorFactory(name, name) + { + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); + return new DoubleFirstAggregator(name, null, null) + { + @Override + public void aggregate() + { + SerializablePair pair = (SerializablePair) selector.get(); + if (pair.lhs < firstTime) { + firstTime = pair.lhs; + firstValue = pair.rhs; + } + } + }; + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); + return new DoubleFirstBufferAggregator(null, null) + { + @Override + public void aggregate(ByteBuffer buf, int position) + { + SerializablePair pair = (SerializablePair) selector.get(); + long firstTime = buf.getLong(position); + if (pair.lhs < firstTime) { + buf.putLong(position, pair.lhs); + buf.putDouble(position + Longs.BYTES, pair.rhs); + } + } + }; + } + }; + } + + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + + @Override + public List getRequiredColumns() + { + return Arrays.asList(new DoubleFirstAggregatorFactory(fieldName, fieldName)); + } + + @Override + public Object deserialize(Object object) + { + Map map = (Map) object; + return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).doubleValue()); + } + + @Override + public Object finalizeComputation(Object object) + { + return ((SerializablePair) object).rhs; + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @Override + public List requiredFields() + { + return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName); + } + + @Override + public byte[] getCacheKey() + { + byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); + + return ByteBuffer.allocate(2 + fieldNameBytes.length) + .put(CACHE_TYPE_ID) + .put(fieldNameBytes) + .put((byte)0xff) + .array(); + } + + @Override + public String getTypeName() + { + return "float"; + } + + @Override + public int getMaxIntermediateSize() + { + return Longs.BYTES + Doubles.BYTES; + } + + @Override + public Object getAggregatorStartValue() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DoubleFirstAggregatorFactory that = (DoubleFirstAggregatorFactory) o; + + return fieldName.equals(that.fieldName) && name.equals(that.name); + } + + @Override + public int hashCode() + { + int result = name.hashCode(); + result = 31 * result + fieldName.hashCode(); + return result; + } + + @Override + public String toString() + { + return "DoubleFirstAggregatorFactory{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + '}'; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstBufferAggregator.java new file mode 100644 index 000000000000..8349dad65f1d --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstBufferAggregator.java @@ -0,0 +1,82 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import com.google.common.primitives.Longs; +import io.druid.collections.SerializablePair; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.LongColumnSelector; + +import java.nio.ByteBuffer; + +public class DoubleFirstBufferAggregator implements BufferAggregator +{ + private final LongColumnSelector timeSelector; + private final FloatColumnSelector valueSelector; + + public DoubleFirstBufferAggregator(LongColumnSelector timeSelector, FloatColumnSelector valueSelector) + { + this.timeSelector = timeSelector; + this.valueSelector = valueSelector; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.putLong(position, Long.MAX_VALUE); + buf.putDouble(position + Longs.BYTES, 0); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + long time = timeSelector.get(); + long firstTime = buf.getLong(position); + if (time < firstTime) { + buf.putLong(position, time); + buf.putDouble(position + Longs.BYTES, valueSelector.get()); + } + } + + @Override + public Object get(ByteBuffer buf, int position) + { + return new SerializablePair<>(buf.getLong(position), buf.getDouble(position + Longs.BYTES)); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + return (float) buf.getDouble(position + Longs.BYTES); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + return (long) buf.getDouble(position + Longs.BYTES); + } + + @Override + public void close() + { + // no resources to cleanup + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregator.java b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregator.java new file mode 100644 index 000000000000..51a5309c73a8 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregator.java @@ -0,0 +1,95 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import io.druid.collections.SerializablePair; +import io.druid.query.aggregation.Aggregator; +import io.druid.segment.LongColumnSelector; + +public class LongFirstAggregator implements Aggregator +{ + + private final LongColumnSelector valueSelector; + private final LongColumnSelector timeSelector; + private final String name; + + protected long firstTime; + protected long firstValue; + + public LongFirstAggregator( + String name, + LongColumnSelector timeSelector, + LongColumnSelector valueSelector + ) + { + this.name = name; + this.valueSelector = valueSelector; + this.timeSelector = timeSelector; + + reset(); + } + + @Override + public void aggregate() + { + long time = timeSelector.get(); + if (time < firstTime) { + firstTime = time; + firstValue = valueSelector.get(); + } + } + + @Override + public void reset() + { + firstTime = Long.MAX_VALUE; + firstValue = 0; + } + + @Override + public Object get() + { + return new SerializablePair<>(firstTime, firstValue); + } + + @Override + public float getFloat() + { + return (float) firstValue; + } + + @Override + public String getName() + { + return name; + } + + @Override + public void close() + { + + } + + @Override + public long getLong() + { + return firstValue; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java new file mode 100644 index 000000000000..459cf4ac2fba --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java @@ -0,0 +1,255 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; +import com.metamx.common.StringUtils; +import io.druid.collections.SerializablePair; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.column.Column; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +public class LongFirstAggregatorFactory extends AggregatorFactory +{ + public static final Comparator VALUE_COMPARATOR = new Comparator() + { + @Override + public int compare(Object o1, Object o2) + { + return Longs.compare(((SerializablePair) o1).rhs, ((SerializablePair) o2).rhs); + } + }; + + private static final byte CACHE_TYPE_ID = 17; + + private final String fieldName; + private final String name; + + @JsonCreator + public LongFirstAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") final String fieldName + ) + { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + + this.name = name; + this.fieldName = fieldName; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + return new LongFirstAggregator( + name, + metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), + metricFactory.makeLongColumnSelector(fieldName) + ); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + return new LongFirstBufferAggregator( + metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), + metricFactory.makeLongColumnSelector(fieldName) + ); + } + + @Override + public Comparator getComparator() + { + return VALUE_COMPARATOR; + } + + @Override + public Object combine(Object lhs, Object rhs) + { + return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) <= 0 ? lhs : rhs; + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new LongFirstAggregatorFactory(name, name) + { + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); + return new LongFirstAggregator(name, null, null) + { + @Override + public void aggregate() + { + SerializablePair pair = (SerializablePair) selector.get(); + if (pair.lhs < firstTime) { + firstTime = pair.lhs; + firstValue = pair.rhs; + } + } + }; + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); + return new LongFirstBufferAggregator(null, null) + { + @Override + public void aggregate(ByteBuffer buf, int position) + { + SerializablePair pair = (SerializablePair) selector.get(); + long firstTime = buf.getLong(position); + if (pair.lhs < firstTime) { + buf.putLong(position, pair.lhs); + buf.putLong(position + Longs.BYTES, pair.rhs); + } + } + }; + } + }; + } + + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + + @Override + public List getRequiredColumns() + { + return Arrays.asList(new LongFirstAggregatorFactory(fieldName, fieldName)); + } + + @Override + public Object deserialize(Object object) + { + Map map = (Map) object; + return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).longValue()); + } + + @Override + public Object finalizeComputation(Object object) + { + return ((SerializablePair) object).rhs; + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @Override + public List requiredFields() + { + return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName); + } + + @Override + public byte[] getCacheKey() + { + byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); + + return ByteBuffer.allocate(2 + fieldNameBytes.length) + .put(CACHE_TYPE_ID) + .put(fieldNameBytes) + .put((byte)0xff) + .array(); + } + + @Override + public String getTypeName() + { + return "long"; + } + + @Override + public int getMaxIntermediateSize() + { + return Longs.BYTES * 2; + } + + @Override + public Object getAggregatorStartValue() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + LongFirstAggregatorFactory that = (LongFirstAggregatorFactory) o; + + return fieldName.equals(that.fieldName) && name.equals(that.name); + } + + @Override + public int hashCode() + { + int result = name.hashCode(); + result = 31 * result + fieldName.hashCode(); + return result; + } + + @Override + public String toString() + { + return "LongFirstAggregatorFactory{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + '}'; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstBufferAggregator.java new file mode 100644 index 000000000000..e7f750660f42 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstBufferAggregator.java @@ -0,0 +1,81 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import com.google.common.primitives.Longs; +import io.druid.collections.SerializablePair; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.segment.LongColumnSelector; + +import java.nio.ByteBuffer; + +public class LongFirstBufferAggregator implements BufferAggregator +{ + private final LongColumnSelector timeSelector; + private final LongColumnSelector valueSelector; + + public LongFirstBufferAggregator(LongColumnSelector timeSelector, LongColumnSelector valueSelector) + { + this.timeSelector = timeSelector; + this.valueSelector = valueSelector; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.putLong(position, Long.MAX_VALUE); + buf.putLong(position + Longs.BYTES, 0); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + long time = timeSelector.get(); + long firstTime = buf.getLong(position); + if (time < firstTime) { + buf.putLong(position, time); + buf.putLong(position + Longs.BYTES, valueSelector.get()); + } + } + + @Override + public Object get(ByteBuffer buf, int position) + { + return new SerializablePair<>(buf.getLong(position), buf.getLong(position + Longs.BYTES)); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + return (float) buf.getLong(position + Longs.BYTES); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + return buf.getLong(position + Longs.BYTES); + } + + @Override + public void close() + { + // no resources to cleanup + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregator.java b/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregator.java new file mode 100644 index 000000000000..3e72387ea401 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregator.java @@ -0,0 +1,96 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import io.druid.collections.SerializablePair; +import io.druid.query.aggregation.Aggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.LongColumnSelector; + +public class DoubleLastAggregator implements Aggregator +{ + + private final FloatColumnSelector valueSelector; + private final LongColumnSelector timeSelector; + private final String name; + + protected long lastTime; + protected double lastValue; + + public DoubleLastAggregator( + String name, + LongColumnSelector timeSelector, + FloatColumnSelector valueSelector + ) + { + this.name = name; + this.valueSelector = valueSelector; + this.timeSelector = timeSelector; + + reset(); + } + + @Override + public void aggregate() + { + long time = timeSelector.get(); + if (time >= lastTime) { + lastTime = timeSelector.get(); + lastValue = valueSelector.get(); + } + } + + @Override + public void reset() + { + lastTime = Long.MIN_VALUE; + lastValue = 0; + } + + @Override + public Object get() + { + return new SerializablePair<>(lastTime, lastValue); + } + + @Override + public float getFloat() + { + return (float) lastValue; + } + + @Override + public String getName() + { + return name; + } + + @Override + public void close() + { + + } + + @Override + public long getLong() + { + return (long) lastValue; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java new file mode 100644 index 000000000000..302a1cfa791c --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java @@ -0,0 +1,248 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Longs; +import com.metamx.common.StringUtils; +import io.druid.collections.SerializablePair; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory; +import io.druid.query.aggregation.first.LongFirstAggregatorFactory; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.column.Column; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +public class DoubleLastAggregatorFactory extends AggregatorFactory +{ + private static final byte CACHE_TYPE_ID = 18; + + private final String fieldName; + private final String name; + + @JsonCreator + public DoubleLastAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") final String fieldName + ) + { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + this.name = name; + this.fieldName = fieldName; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + return new DoubleLastAggregator( + name, + metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), + metricFactory.makeFloatColumnSelector(fieldName) + ); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + return new DoubleLastBufferAggregator( + metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), + metricFactory.makeFloatColumnSelector(fieldName) + ); + } + + @Override + public Comparator getComparator() + { + return DoubleFirstAggregatorFactory.VALUE_COMPARATOR; + } + + @Override + public Object combine(Object lhs, Object rhs) + { + return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs; + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new DoubleLastAggregatorFactory(name, name) + { + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); + return new DoubleLastAggregator(name, null, null) + { + @Override + public void aggregate() + { + SerializablePair pair = (SerializablePair) selector.get(); + if (pair.lhs >= lastTime) { + lastTime = pair.lhs; + lastValue = pair.rhs; + } + } + }; + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); + return new DoubleLastBufferAggregator(null, null) + { + @Override + public void aggregate(ByteBuffer buf, int position) + { + SerializablePair pair = (SerializablePair) selector.get(); + long lastTime = buf.getLong(position); + if (pair.lhs >= lastTime) { + buf.putLong(position, pair.lhs); + buf.putDouble(position + Longs.BYTES, pair.rhs); + } + } + }; + } + }; + } + + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + + @Override + public List getRequiredColumns() + { + return Arrays.asList(new LongFirstAggregatorFactory(fieldName, fieldName)); + } + + @Override + public Object deserialize(Object object) + { + Map map = (Map) object; + return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).doubleValue()); + } + + @Override + public Object finalizeComputation(Object object) + { + return ((SerializablePair) object).rhs; + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @Override + public List requiredFields() + { + return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName); + } + + @Override + public byte[] getCacheKey() + { + byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); + + return ByteBuffer.allocate(2 + fieldNameBytes.length) + .put(CACHE_TYPE_ID) + .put(fieldNameBytes) + .put((byte)0xff) + .array(); + } + + @Override + public String getTypeName() + { + return "float"; + } + + @Override + public int getMaxIntermediateSize() + { + return Longs.BYTES + Doubles.BYTES; + } + + @Override + public Object getAggregatorStartValue() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DoubleLastAggregatorFactory that = (DoubleLastAggregatorFactory) o; + + return fieldName.equals(that.fieldName) && name.equals(that.name); + } + + @Override + public int hashCode() + { + int result = name.hashCode(); + result = 31 * result + fieldName.hashCode(); + return result; + } + + @Override + public String toString() + { + return "DoubleLastAggregatorFactory{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + '}'; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastBufferAggregator.java new file mode 100644 index 000000000000..e02118e4ae00 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/last/DoubleLastBufferAggregator.java @@ -0,0 +1,82 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import com.google.common.primitives.Longs; +import io.druid.collections.SerializablePair; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.LongColumnSelector; + +import java.nio.ByteBuffer; + +public class DoubleLastBufferAggregator implements BufferAggregator +{ + private final LongColumnSelector timeSelector; + private final FloatColumnSelector valueSelector; + + public DoubleLastBufferAggregator(LongColumnSelector timeSelector, FloatColumnSelector valueSelector) + { + this.timeSelector = timeSelector; + this.valueSelector = valueSelector; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.putLong(position, Long.MIN_VALUE); + buf.putDouble(position + Longs.BYTES, 0); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + long time = timeSelector.get(); + long lastTime = buf.getLong(position); + if (time >= lastTime) { + buf.putLong(position, time); + buf.putDouble(position + Longs.BYTES, valueSelector.get()); + } + } + + @Override + public Object get(ByteBuffer buf, int position) + { + return new SerializablePair<>(buf.getLong(position), buf.getDouble(position + Longs.BYTES)); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + return (float) buf.getDouble(position + Longs.BYTES); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + return (long) buf.getDouble(position + Longs.BYTES); + } + + @Override + public void close() + { + // no resources to cleanup + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregator.java b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregator.java new file mode 100644 index 000000000000..bc312918eeb5 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregator.java @@ -0,0 +1,94 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import io.druid.collections.SerializablePair; +import io.druid.query.aggregation.Aggregator; +import io.druid.segment.LongColumnSelector; + +public class LongLastAggregator implements Aggregator +{ + private final LongColumnSelector valueSelector; + private final LongColumnSelector timeSelector; + private final String name; + + protected long lastTime; + protected long lastValue; + + public LongLastAggregator( + String name, + LongColumnSelector timeSelector, + LongColumnSelector valueSelector + ) + { + this.name = name; + this.valueSelector = valueSelector; + this.timeSelector = timeSelector; + + reset(); + } + + @Override + public void aggregate() + { + long time = timeSelector.get(); + if (time >= lastTime) { + lastTime = timeSelector.get(); + lastValue = valueSelector.get(); + } + } + + @Override + public void reset() + { + lastTime = Long.MIN_VALUE; + lastValue = 0; + } + + @Override + public Object get() + { + return new SerializablePair<>(lastTime, lastValue); + } + + @Override + public float getFloat() + { + return (float) lastValue; + } + + @Override + public String getName() + { + return name; + } + + @Override + public void close() + { + + } + + @Override + public long getLong() + { + return lastValue; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java new file mode 100644 index 000000000000..481e888bbaa1 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java @@ -0,0 +1,248 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; +import com.metamx.common.StringUtils; +import io.druid.collections.SerializablePair; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory; +import io.druid.query.aggregation.first.LongFirstAggregatorFactory; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.column.Column; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +public class LongLastAggregatorFactory extends AggregatorFactory +{ + private static final byte CACHE_TYPE_ID = 19; + + private final String fieldName; + private final String name; + + @JsonCreator + public LongLastAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") final String fieldName + ) + { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + this.name = name; + this.fieldName = fieldName; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + return new LongLastAggregator( + name, + metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), + metricFactory.makeLongColumnSelector(fieldName) + ); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + return new LongLastBufferAggregator( + metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), + metricFactory.makeLongColumnSelector(fieldName) + ); + } + + @Override + public Comparator getComparator() + { + return LongFirstAggregatorFactory.VALUE_COMPARATOR; + } + + @Override + public Object combine(Object lhs, Object rhs) + { + return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs; + } + + + @Override + public AggregatorFactory getCombiningFactory() + { + return new LongLastAggregatorFactory(name, name) + { + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); + return new LongLastAggregator(name, null, null) + { + @Override + public void aggregate() + { + SerializablePair pair = (SerializablePair) selector.get(); + if (pair.lhs >= lastTime) { + lastTime = pair.lhs; + lastValue = pair.rhs; + } + } + }; + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); + return new LongLastBufferAggregator(null, null) + { + @Override + public void aggregate(ByteBuffer buf, int position) + { + SerializablePair pair = (SerializablePair) selector.get(); + long lastTime = buf.getLong(position); + if (pair.lhs >= lastTime) { + buf.putLong(position, pair.lhs); + buf.putLong(position + Longs.BYTES, pair.rhs); + } + } + }; + } + }; + } + + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + + @Override + public List getRequiredColumns() + { + return Arrays.asList(new LongLastAggregatorFactory(fieldName, fieldName)); + } + + @Override + public Object deserialize(Object object) + { + Map map = (Map) object; + return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).longValue()); + } + + @Override + public Object finalizeComputation(Object object) + { + return ((SerializablePair) object).rhs; + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @Override + public List requiredFields() + { + return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName); + } + + @Override + public byte[] getCacheKey() + { + byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); + + return ByteBuffer.allocate(2 + fieldNameBytes.length) + .put(CACHE_TYPE_ID) + .put(fieldNameBytes) + .put((byte)0xff) + .array(); + } + + @Override + public String getTypeName() + { + return "long"; + } + + @Override + public int getMaxIntermediateSize() + { + return Longs.BYTES * 2; + } + + @Override + public Object getAggregatorStartValue() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + LongLastAggregatorFactory that = (LongLastAggregatorFactory) o; + + return name.equals(that.name) && fieldName.equals(that.fieldName); + } + + @Override + public int hashCode() + { + int result = name.hashCode(); + result = 31 * result + fieldName.hashCode(); + return result; + } + + @Override + public String toString() + { + return "LongLastAggregatorFactory{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + '}'; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/last/LongLastBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/last/LongLastBufferAggregator.java new file mode 100644 index 000000000000..8bc09828eeb4 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/last/LongLastBufferAggregator.java @@ -0,0 +1,81 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import com.google.common.primitives.Longs; +import io.druid.collections.SerializablePair; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.segment.LongColumnSelector; + +import java.nio.ByteBuffer; + +public class LongLastBufferAggregator implements BufferAggregator +{ + private final LongColumnSelector timeSelector; + private final LongColumnSelector valueSelector; + + public LongLastBufferAggregator(LongColumnSelector timeSelector, LongColumnSelector valueSelector) + { + this.timeSelector = timeSelector; + this.valueSelector = valueSelector; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.putLong(position, Long.MIN_VALUE); + buf.putLong(position + Longs.BYTES, 0); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + long time = timeSelector.get(); + long lastTime = buf.getLong(position); + if (time >= lastTime) { + buf.putLong(position, time); + buf.putLong(position + Longs.BYTES, valueSelector.get()); + } + } + + @Override + public Object get(ByteBuffer buf, int position) + { + return new SerializablePair<>(buf.getLong(position), buf.getLong(position + Longs.BYTES)); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + return (float) buf.getLong(position + Longs.BYTES); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + return buf.getLong(position + Longs.BYTES); + } + + @Override + public void close() + { + // no resources to cleanup + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 880150bd7d13..1fda3890aa5c 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -55,7 +55,8 @@ public class GroupByQueryHelper public static Pair> createIndexAccumulatorPair( final GroupByQuery query, final GroupByQueryConfig config, - StupidPool bufferPool + StupidPool bufferPool, + final boolean combine ) { final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); @@ -66,17 +67,23 @@ public static Pair> creat // AllGranularity returns timeStart instead of Long.MIN_VALUE final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next(); - final List aggs = Lists.transform( - query.getAggregatorSpecs(), - new Function() - { - @Override - public AggregatorFactory apply(AggregatorFactory input) + final List aggs; + if (combine) { + aggs = Lists.transform( + query.getAggregatorSpecs(), + new Function() { - return input.getCombiningFactory(); + @Override + public AggregatorFactory apply(AggregatorFactory input) + { + return input.getCombiningFactory(); + } } - } - ); + ); + } else { + aggs = query.getAggregatorSpecs(); + } + final List dimensions = Lists.transform( query.getDimensions(), new Function() @@ -178,13 +185,15 @@ public static IncrementalIndex makeIncrementalIndex( GroupByQuery query, GroupByQueryConfig config, StupidPool bufferPool, - Sequence rows + Sequence rows, + boolean combine ) { Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( query, config, - bufferPool + bufferPool, + combine ); return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 436e9d31ce5a..b9c9e8ba13bd 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -38,6 +38,7 @@ import io.druid.granularity.QueryGranularity; import io.druid.guice.annotations.Global; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.guava.MappedSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; @@ -172,7 +173,21 @@ private Sequence mergeGroupByResults( runner, context ); - return strategySelector.strategize(query).processSubqueryResult(subquery, query, subqueryResult); + + final Sequence finalizingResults; + if (GroupByQuery.getContextFinalize(subquery, false)) { + finalizingResults = new MappedSequence<>( + subqueryResult, + makePreComputeManipulatorFn( + subquery, + MetricManipulatorFns.finalizing() + ) + ); + } else { + finalizingResults = subqueryResult; + } + + return strategySelector.strategize(query).processSubqueryResult(subquery, query, finalizingResults); } else { return strategySelector.strategize(query).mergeResults(runner, query, context); } diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index 9660a8c2bb7a..f01365f2a3d3 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -116,7 +116,8 @@ public Sequence mergeResults( ) ), responseContext - ) + ), + true ); return new ResourceClosingSequence<>(query.applyLimit(GroupByQueryHelper.postAggregate(query, index)), index); @@ -178,21 +179,26 @@ public boolean apply(AggregatorFactory agg) .setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec())) .build(); - final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex( + final IncrementalIndex innerQueryResultIndex = GroupByQueryHelper.makeIncrementalIndex( innerQuery.withOverriddenContext( ImmutableMap.of( GroupByQueryHelper.CTX_KEY_SORT_RESULTS, true ) ), - subqueryResult + configSupplier.get(), + bufferPool, + subqueryResult, + false ); //Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which //is ensured by QuerySegmentSpec. //GroupByQueryEngine can only process one interval at a time, so we need to call it once per interval //and concatenate the results. - final IncrementalIndex outerQueryResultIndex = makeIncrementalIndex( + final IncrementalIndex outerQueryResultIndex = GroupByQueryHelper.makeIncrementalIndex( outerQuery, + configSupplier.get(), + bufferPool, Sequences.concat( Sequences.map( Sequences.simple(outerQuery.getIntervals()), @@ -210,7 +216,8 @@ public Sequence apply(Interval interval) } } ) - ) + ), + true ); innerQueryResultIndex.close(); @@ -221,11 +228,6 @@ public Sequence apply(Interval interval) ); } - private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence rows) - { - return GroupByQueryHelper.makeIncrementalIndex(query, configSupplier.get(), bufferPool, rows); - } - @Override public QueryRunner mergeRunners( final ListeningExecutorService exec, diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 8cfe775c897e..86c033357bc2 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -106,6 +106,7 @@ public TableDataSource apply(@Nullable String input) public static final DateTime minTime = new DateTime("2011-01-12T00:00:00.000Z"); public static final QueryGranularity dayGran = QueryGranularities.DAY; + public static final QueryGranularity monthGran = QueryGranularities.MONTH; public static final QueryGranularity allGran = QueryGranularities.ALL; public static final String timeDimension = "__time"; public static final String marketDimension = "market"; diff --git a/processing/src/test/java/io/druid/query/aggregation/first/DoubleFirstAggregationTest.java b/processing/src/test/java/io/druid/query/aggregation/first/DoubleFirstAggregationTest.java new file mode 100644 index 000000000000..1e31600cbd14 --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/first/DoubleFirstAggregationTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import io.druid.collections.SerializablePair; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.Pair; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.TestFloatColumnSelector; +import io.druid.query.aggregation.TestLongColumnSelector; +import io.druid.query.aggregation.TestObjectColumnSelector; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.column.Column; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +public class DoubleFirstAggregationTest +{ + private DoubleFirstAggregatorFactory doubleFirstAggFactory; + private DoubleFirstAggregatorFactory combiningAggFactory; + private ColumnSelectorFactory colSelectorFactory; + private TestLongColumnSelector timeSelector; + private TestFloatColumnSelector valueSelector; + private TestObjectColumnSelector objectSelector; + + private float[] floatValues = {1.1f, 2.7f, 3.5f, 1.3f}; + private long[] times = {12, 10, 5344, 7899999}; + private SerializablePair[] pairs = { + new SerializablePair<>(1467225096L, 134.3d), + new SerializablePair<>(23163L, 1232.212d), + new SerializablePair<>(742L, 18d), + new SerializablePair<>(111111L, 233.5232d) + }; + + @Before + public void setup() + { + doubleFirstAggFactory = new DoubleFirstAggregatorFactory("billy", "nilly"); + combiningAggFactory = (DoubleFirstAggregatorFactory) doubleFirstAggFactory.getCombiningFactory(); + timeSelector = new TestLongColumnSelector(times); + valueSelector = new TestFloatColumnSelector(floatValues); + objectSelector = new TestObjectColumnSelector(pairs); + colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); + EasyMock.expect(colSelectorFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector); + EasyMock.expect(colSelectorFactory.makeFloatColumnSelector("nilly")).andReturn(valueSelector); + EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("billy")).andReturn(objectSelector); + EasyMock.replay(colSelectorFactory); + } + + @Test + public void testDoubleFirstAggregator() + { + DoubleFirstAggregator agg = (DoubleFirstAggregator) doubleFirstAggFactory.factorize(colSelectorFactory); + + Assert.assertEquals("billy", agg.getName()); + + aggregate(agg); + aggregate(agg); + aggregate(agg); + aggregate(agg); + + Pair result = (Pair) agg.get(); + + Assert.assertEquals(times[1], result.lhs.longValue()); + Assert.assertEquals(floatValues[1], result.rhs, 0.0001); + Assert.assertEquals((long) floatValues[1], agg.getLong()); + Assert.assertEquals(floatValues[1], agg.getFloat(), 0.0001); + + agg.reset(); + Assert.assertEquals(0, ((Pair) agg.get()).rhs, 0.0001); + } + + @Test + public void testDoubleFirstBufferAggregator() + { + DoubleFirstBufferAggregator agg = (DoubleFirstBufferAggregator) doubleFirstAggFactory.factorizeBuffered( + colSelectorFactory); + + ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleFirstAggFactory.getMaxIntermediateSize()]); + agg.init(buffer, 0); + + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + + Pair result = (Pair) agg.get(buffer, 0); + + Assert.assertEquals(times[1], result.lhs.longValue()); + Assert.assertEquals(floatValues[1], result.rhs, 0.0001); + Assert.assertEquals((long) floatValues[1], agg.getLong(buffer, 0)); + Assert.assertEquals(floatValues[1], agg.getFloat(buffer, 0), 0.0001); + } + + @Test + public void testCombine() + { + SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621); + SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4); + Assert.assertEquals(pair1, doubleFirstAggFactory.combine(pair1, pair2)); + } + + @Test + public void testDoubleFirstCombiningAggregator() + { + DoubleFirstAggregator agg = (DoubleFirstAggregator) combiningAggFactory.factorize(colSelectorFactory); + + Assert.assertEquals("billy", agg.getName()); + + aggregate(agg); + aggregate(agg); + aggregate(agg); + aggregate(agg); + + Pair result = (Pair) agg.get(); + Pair expected = (Pair)pairs[2]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.rhs, result.rhs, 0.0001); + Assert.assertEquals(expected.rhs.longValue(), agg.getLong()); + Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001); + + agg.reset(); + Assert.assertEquals(0, ((Pair) agg.get()).rhs, 0.0001); + } + + @Test + public void testDoubleFirstCombiningBufferAggregator() + { + DoubleFirstBufferAggregator agg = (DoubleFirstBufferAggregator) combiningAggFactory.factorizeBuffered( + colSelectorFactory); + + ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleFirstAggFactory.getMaxIntermediateSize()]); + agg.init(buffer, 0); + + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + + Pair result = (Pair) agg.get(buffer, 0); + Pair expected = (Pair)pairs[2]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.rhs, result.rhs, 0.0001); + Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0)); + Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001); + } + + + @Test + public void testSerde() throws Exception + { + DefaultObjectMapper mapper = new DefaultObjectMapper(); + String doubleSpecJson = "{\"type\":\"doubleFirst\",\"name\":\"billy\",\"fieldName\":\"nilly\"}"; + Assert.assertEquals(doubleFirstAggFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class)); + } + + private void aggregate( + DoubleFirstAggregator agg + ) + { + agg.aggregate(); + timeSelector.increment(); + valueSelector.increment(); + objectSelector.increment(); + } + + private void aggregate( + DoubleFirstBufferAggregator agg, + ByteBuffer buff, + int position + ) + { + agg.aggregate(buff, position); + timeSelector.increment(); + valueSelector.increment(); + objectSelector.increment(); + } +} diff --git a/processing/src/test/java/io/druid/query/aggregation/first/LongFirstAggregationTest.java b/processing/src/test/java/io/druid/query/aggregation/first/LongFirstAggregationTest.java new file mode 100644 index 000000000000..d95b75f5a991 --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/first/LongFirstAggregationTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.first; + +import io.druid.collections.SerializablePair; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.Pair; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.TestLongColumnSelector; +import io.druid.query.aggregation.TestObjectColumnSelector; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.column.Column; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +public class LongFirstAggregationTest +{ + private LongFirstAggregatorFactory longFirstAggFactory; + private LongFirstAggregatorFactory combiningAggFactory; + private ColumnSelectorFactory colSelectorFactory; + private TestLongColumnSelector timeSelector; + private TestLongColumnSelector valueSelector; + private TestObjectColumnSelector objectSelector; + + private long[] longValues = {185, -216, -128751132, Long.MIN_VALUE}; + private long[] times = {1123126751, 1784247991, 1854329816, 1000000000}; + private SerializablePair[] pairs = { + new SerializablePair<>(1L, 113267L), + new SerializablePair<>(1L, 5437384L), + new SerializablePair<>(6L, 34583458L), + new SerializablePair<>(88L, 34583452L) + }; + + @Before + public void setup() + { + longFirstAggFactory = new LongFirstAggregatorFactory("billy", "nilly"); + combiningAggFactory = (LongFirstAggregatorFactory) longFirstAggFactory.getCombiningFactory(); + timeSelector = new TestLongColumnSelector(times); + valueSelector = new TestLongColumnSelector(longValues); + objectSelector = new TestObjectColumnSelector(pairs); + colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); + EasyMock.expect(colSelectorFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector); + EasyMock.expect(colSelectorFactory.makeLongColumnSelector("nilly")).andReturn(valueSelector); + EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("billy")).andReturn(objectSelector); + EasyMock.replay(colSelectorFactory); + } + + @Test + public void testLongFirstAggregator() + { + LongFirstAggregator agg = (LongFirstAggregator) longFirstAggFactory.factorize(colSelectorFactory); + + Assert.assertEquals("billy", agg.getName()); + + aggregate(agg); + aggregate(agg); + aggregate(agg); + aggregate(agg); + + Pair result = (Pair) agg.get(); + + Assert.assertEquals(times[3], result.lhs.longValue()); + Assert.assertEquals(longValues[3], result.rhs.longValue()); + Assert.assertEquals(longValues[3], agg.getLong()); + Assert.assertEquals(longValues[3], agg.getFloat(), 0.0001); + + agg.reset(); + Assert.assertEquals(0, ((Pair) agg.get()).rhs, 0.0001); + } + + @Test + public void testLongFirstBufferAggregator() + { + LongFirstBufferAggregator agg = (LongFirstBufferAggregator) longFirstAggFactory.factorizeBuffered( + colSelectorFactory); + + ByteBuffer buffer = ByteBuffer.wrap(new byte[longFirstAggFactory.getMaxIntermediateSize()]); + agg.init(buffer, 0); + + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + + Pair result = (Pair) agg.get(buffer, 0); + + Assert.assertEquals(times[3], result.lhs.longValue()); + Assert.assertEquals(longValues[3], result.rhs.longValue()); + Assert.assertEquals(longValues[3], agg.getLong(buffer, 0)); + Assert.assertEquals(longValues[3], agg.getFloat(buffer, 0), 0.0001); + } + + @Test + public void testCombine() + { + SerializablePair pair1 = new SerializablePair<>(1467225000L, 1263L); + SerializablePair pair2 = new SerializablePair<>(1467240000L, 752713L); + Assert.assertEquals(pair1, longFirstAggFactory.combine(pair1, pair2)); + } + + @Test + public void testLongFirstCombiningAggregator() + { + LongFirstAggregator agg = (LongFirstAggregator) combiningAggFactory.factorize(colSelectorFactory); + + Assert.assertEquals("billy", agg.getName()); + + aggregate(agg); + aggregate(agg); + aggregate(agg); + aggregate(agg); + + Pair result = (Pair) agg.get(); + Pair expected = (Pair)pairs[0]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.rhs, result.rhs); + Assert.assertEquals(expected.rhs.longValue(), agg.getLong()); + Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001); + + agg.reset(); + Assert.assertEquals(0, ((Pair) agg.get()).rhs, 0.0001); + } + + @Test + public void testLongFirstCombiningBufferAggregator() + { + LongFirstBufferAggregator agg = (LongFirstBufferAggregator) combiningAggFactory.factorizeBuffered( + colSelectorFactory); + + ByteBuffer buffer = ByteBuffer.wrap(new byte[longFirstAggFactory.getMaxIntermediateSize()]); + agg.init(buffer, 0); + + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + + Pair result = (Pair) agg.get(buffer, 0); + Pair expected = (Pair)pairs[0]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.rhs, result.rhs); + Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0)); + Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001); + } + + + @Test + public void testSerde() throws Exception + { + DefaultObjectMapper mapper = new DefaultObjectMapper(); + String longSpecJson = "{\"type\":\"longFirst\",\"name\":\"billy\",\"fieldName\":\"nilly\"}"; + Assert.assertEquals(longFirstAggFactory, mapper.readValue(longSpecJson, AggregatorFactory.class)); + } + + private void aggregate( + LongFirstAggregator agg + ) + { + agg.aggregate(); + timeSelector.increment(); + valueSelector.increment(); + objectSelector.increment(); + } + + private void aggregate( + LongFirstBufferAggregator agg, + ByteBuffer buff, + int position + ) + { + agg.aggregate(buff, position); + timeSelector.increment(); + valueSelector.increment(); + objectSelector.increment(); + } +} diff --git a/processing/src/test/java/io/druid/query/aggregation/last/DoubleLastAggregationTest.java b/processing/src/test/java/io/druid/query/aggregation/last/DoubleLastAggregationTest.java new file mode 100644 index 000000000000..ffc1e553a1b9 --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/last/DoubleLastAggregationTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import io.druid.collections.SerializablePair; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.Pair; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.TestFloatColumnSelector; +import io.druid.query.aggregation.TestLongColumnSelector; +import io.druid.query.aggregation.TestObjectColumnSelector; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.column.Column; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +public class DoubleLastAggregationTest +{ + private DoubleLastAggregatorFactory doubleLastAggFactory; + private DoubleLastAggregatorFactory combiningAggFactory; + private ColumnSelectorFactory colSelectorFactory; + private TestLongColumnSelector timeSelector; + private TestFloatColumnSelector valueSelector; + private TestObjectColumnSelector objectSelector; + + private float[] floatValues = {1.1897f, 0.001f, 86.23f, 166.228f}; + private long[] times = {8224, 6879, 2436, 7888}; + private SerializablePair[] pairs = { + new SerializablePair<>(52782L, 134.3d), + new SerializablePair<>(65492L, 1232.212d), + new SerializablePair<>(69134L, 18.1233d), + new SerializablePair<>(11111L, 233.5232d) + }; + + @Before + public void setup() + { + doubleLastAggFactory = new DoubleLastAggregatorFactory("billy", "nilly"); + combiningAggFactory = (DoubleLastAggregatorFactory) doubleLastAggFactory.getCombiningFactory(); + timeSelector = new TestLongColumnSelector(times); + valueSelector = new TestFloatColumnSelector(floatValues); + objectSelector = new TestObjectColumnSelector(pairs); + colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); + EasyMock.expect(colSelectorFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector); + EasyMock.expect(colSelectorFactory.makeFloatColumnSelector("nilly")).andReturn(valueSelector); + EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("billy")).andReturn(objectSelector); + EasyMock.replay(colSelectorFactory); + } + + @Test + public void testDoubleLastAggregator() + { + DoubleLastAggregator agg = (DoubleLastAggregator) doubleLastAggFactory.factorize(colSelectorFactory); + + Assert.assertEquals("billy", agg.getName()); + + aggregate(agg); + aggregate(agg); + aggregate(agg); + aggregate(agg); + + Pair result = (Pair) agg.get(); + + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(floatValues[0], result.rhs, 0.0001); + Assert.assertEquals((long) floatValues[0], agg.getLong()); + Assert.assertEquals(floatValues[0], agg.getFloat(), 0.0001); + + agg.reset(); + Assert.assertEquals(0, ((Pair) agg.get()).rhs, 0.0001); + } + + @Test + public void testDoubleLastBufferAggregator() + { + DoubleLastBufferAggregator agg = (DoubleLastBufferAggregator) doubleLastAggFactory.factorizeBuffered( + colSelectorFactory); + + ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleLastAggFactory.getMaxIntermediateSize()]); + agg.init(buffer, 0); + + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + + Pair result = (Pair) agg.get(buffer, 0); + + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(floatValues[0], result.rhs, 0.0001); + Assert.assertEquals((long) floatValues[0], agg.getLong(buffer, 0)); + Assert.assertEquals(floatValues[0], agg.getFloat(buffer, 0), 0.0001); + } + + @Test + public void testCombine() + { + SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621); + SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4); + Assert.assertEquals(pair2, doubleLastAggFactory.combine(pair1, pair2)); + } + + @Test + public void testDoubleLastCombiningAggregator() + { + DoubleLastAggregator agg = (DoubleLastAggregator) combiningAggFactory.factorize(colSelectorFactory); + + Assert.assertEquals("billy", agg.getName()); + + aggregate(agg); + aggregate(agg); + aggregate(agg); + aggregate(agg); + + Pair result = (Pair) agg.get(); + Pair expected = (Pair)pairs[2]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.rhs, result.rhs, 0.0001); + Assert.assertEquals(expected.rhs.longValue(), agg.getLong()); + Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001); + + agg.reset(); + Assert.assertEquals(0, ((Pair) agg.get()).rhs, 0.0001); + } + + @Test + public void testDoubleLastCombiningBufferAggregator() + { + DoubleLastBufferAggregator agg = (DoubleLastBufferAggregator) combiningAggFactory.factorizeBuffered( + colSelectorFactory); + + ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleLastAggFactory.getMaxIntermediateSize()]); + agg.init(buffer, 0); + + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + + Pair result = (Pair) agg.get(buffer, 0); + Pair expected = (Pair)pairs[2]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.rhs, result.rhs, 0.0001); + Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0)); + Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001); + } + + + @Test + public void testSerde() throws Exception + { + DefaultObjectMapper mapper = new DefaultObjectMapper(); + String doubleSpecJson = "{\"type\":\"doubleLast\",\"name\":\"billy\",\"fieldName\":\"nilly\"}"; + Assert.assertEquals(doubleLastAggFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class)); + } + + private void aggregate( + DoubleLastAggregator agg + ) + { + agg.aggregate(); + timeSelector.increment(); + valueSelector.increment(); + objectSelector.increment(); + } + + private void aggregate( + DoubleLastBufferAggregator agg, + ByteBuffer buff, + int position + ) + { + agg.aggregate(buff, position); + timeSelector.increment(); + valueSelector.increment(); + objectSelector.increment(); + } +} diff --git a/processing/src/test/java/io/druid/query/aggregation/last/LongLastAggregationTest.java b/processing/src/test/java/io/druid/query/aggregation/last/LongLastAggregationTest.java new file mode 100644 index 000000000000..3c95b9fd9817 --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/last/LongLastAggregationTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.last; + +import io.druid.collections.SerializablePair; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.Pair; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.TestLongColumnSelector; +import io.druid.query.aggregation.TestObjectColumnSelector; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.column.Column; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +public class LongLastAggregationTest +{ + private LongLastAggregatorFactory longLastAggFactory; + private LongLastAggregatorFactory combiningAggFactory; + private ColumnSelectorFactory colSelectorFactory; + private TestLongColumnSelector timeSelector; + private TestLongColumnSelector valueSelector; + private TestObjectColumnSelector objectSelector; + + private long[] longValues = {23216, 8635, 1547123, Long.MAX_VALUE}; + private long[] times = {1467935723, 1467225653, 1601848932, 72515}; + private SerializablePair[] pairs = { + new SerializablePair<>(12531L, 113267L), + new SerializablePair<>(123L, 5437384L), + new SerializablePair<>(125755L, 34583458L), + new SerializablePair<>(124L, 34283452L) + }; + + @Before + public void setup() + { + longLastAggFactory = new LongLastAggregatorFactory("billy", "nilly"); + combiningAggFactory = (LongLastAggregatorFactory) longLastAggFactory.getCombiningFactory(); + timeSelector = new TestLongColumnSelector(times); + valueSelector = new TestLongColumnSelector(longValues); + objectSelector = new TestObjectColumnSelector(pairs); + colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); + EasyMock.expect(colSelectorFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector); + EasyMock.expect(colSelectorFactory.makeLongColumnSelector("nilly")).andReturn(valueSelector); + EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("billy")).andReturn(objectSelector); + EasyMock.replay(colSelectorFactory); + } + + @Test + public void testLongLastAggregator() + { + LongLastAggregator agg = (LongLastAggregator) longLastAggFactory.factorize(colSelectorFactory); + + Assert.assertEquals("billy", agg.getName()); + + aggregate(agg); + aggregate(agg); + aggregate(agg); + aggregate(agg); + + Pair result = (Pair) agg.get(); + + Assert.assertEquals(times[2], result.lhs.longValue()); + Assert.assertEquals(longValues[2], result.rhs.longValue()); + Assert.assertEquals(longValues[2], agg.getLong()); + Assert.assertEquals(longValues[2], agg.getFloat(), 1); + + agg.reset(); + Assert.assertEquals(0, ((Pair) agg.get()).rhs.longValue()); + } + + @Test + public void testLongLastBufferAggregator() + { + LongLastBufferAggregator agg = (LongLastBufferAggregator) longLastAggFactory.factorizeBuffered( + colSelectorFactory); + + ByteBuffer buffer = ByteBuffer.wrap(new byte[longLastAggFactory.getMaxIntermediateSize()]); + agg.init(buffer, 0); + + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + + Pair result = (Pair) agg.get(buffer, 0); + + Assert.assertEquals(times[2], result.lhs.longValue()); + Assert.assertEquals(longValues[2], result.rhs.longValue()); + Assert.assertEquals(longValues[2], agg.getLong(buffer, 0)); + Assert.assertEquals(longValues[2], agg.getFloat(buffer, 0), 1); + } + + @Test + public void testCombine() + { + SerializablePair pair1 = new SerializablePair<>(1467225000L, 64432L); + SerializablePair pair2 = new SerializablePair<>(1467240000L, 99999L); + Assert.assertEquals(pair2, longLastAggFactory.combine(pair1, pair2)); + } + + @Test + public void testLongLastCombiningAggregator() + { + LongLastAggregator agg = (LongLastAggregator) combiningAggFactory.factorize(colSelectorFactory); + + Assert.assertEquals("billy", agg.getName()); + + aggregate(agg); + aggregate(agg); + aggregate(agg); + aggregate(agg); + + Pair result = (Pair) agg.get(); + Pair expected = (Pair)pairs[2]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.rhs, result.rhs); + Assert.assertEquals(expected.rhs.longValue(), agg.getLong()); + Assert.assertEquals(expected.rhs, agg.getFloat(), 1); + + agg.reset(); + Assert.assertEquals(0, ((Pair) agg.get()).rhs.longValue()); + } + + @Test + public void testLongLastCombiningBufferAggregator() + { + LongLastBufferAggregator agg = (LongLastBufferAggregator) combiningAggFactory.factorizeBuffered( + colSelectorFactory); + + ByteBuffer buffer = ByteBuffer.wrap(new byte[longLastAggFactory.getMaxIntermediateSize()]); + agg.init(buffer, 0); + + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + aggregate(agg, buffer, 0); + + Pair result = (Pair) agg.get(buffer, 0); + Pair expected = (Pair)pairs[2]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.rhs, result.rhs); + Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0)); + Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 1); + } + + + @Test + public void testSerde() throws Exception + { + DefaultObjectMapper mapper = new DefaultObjectMapper(); + String longSpecJson = "{\"type\":\"longLast\",\"name\":\"billy\",\"fieldName\":\"nilly\"}"; + Assert.assertEquals(longLastAggFactory, mapper.readValue(longSpecJson, AggregatorFactory.class)); + } + + private void aggregate( + LongLastAggregator agg + ) + { + agg.aggregate(); + timeSelector.increment(); + valueSelector.increment(); + objectSelector.increment(); + } + + private void aggregate( + LongLastBufferAggregator agg, + ByteBuffer buff, + int position + ) + { + agg.aggregate(buff, position); + timeSelector.increment(); + valueSelector.increment(); + objectSelector.increment(); + } +} diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 0c36dea67a3d..9949041c27f8 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -60,11 +60,13 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory; import io.druid.query.aggregation.JavaScriptAggregatorFactory; +import io.druid.query.aggregation.first.LongFirstAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.query.aggregation.last.LongLastAggregatorFactory; import io.druid.query.aggregation.post.ArithmeticPostAggregator; import io.druid.query.aggregation.post.ConstantPostAggregator; import io.druid.query.aggregation.post.ExpressionPostAggregator; @@ -1703,6 +1705,67 @@ public void testGroupByWithCardinality() TestHelper.assertExpectedObjects(expectedResults, results, ""); } + @Test + public void testGroupByWithFirstLast() + { + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("market", "market"))) + .setAggregatorSpecs( + Arrays.asList( + new LongFirstAggregatorFactory("first", "index"), + new LongLastAggregatorFactory("last", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.monthGran) + .build(); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-01-01", "market", "spot", "first", 100L, "last", 155L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-01-01", "market", "total_market", "first", 1000L, "last", 1127L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-01-01", "market", "upfront", "first", 800L, "last", 943L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-02-01", "market", "spot", "first", 132L, "last", 114L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-02-01", "market", "total_market", "first", 1203L, "last", 1292L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-02-01", "market", "upfront", "first", 1667L, "last", 1101L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-01", "market", "spot", "first", 153L, "last", 125L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-01", "market", "total_market", "first", 1124L, "last", 1366L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-01", "market", "upfront", "first", 1166L, "last", 1063L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "market", "spot", "first", 135L, "last", 120L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "market", "total_market", "first", 1314L, "last", 1029L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "market", "upfront", "first", 1447L, "last", 780L) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + @Test + public void testGroupByWithNoResult() + { + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.emptyInterval) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("market", "market"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + QueryRunnerTestHelper.indexLongSum, + QueryRunnerTestHelper.qualityCardinality, + new LongFirstAggregatorFactory("first", "index"), + new LongLastAggregatorFactory("last", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + List expectedResults = ImmutableList.of(); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + Assert.assertEquals(expectedResults, results); + } + @Test public void testGroupByWithNullProducingDimExtractionFn() { @@ -5195,7 +5258,7 @@ public void testSubqueryWithOuterCountAggregator() } @Test - public void testSubqueryWithOuterJavascriptAggregators() + public void testSubqueryWithOuterDimJavascriptAggregators() { final GroupByQuery subquery = GroupByQuery .builder() @@ -5257,6 +5320,69 @@ public void testSubqueryWithOuterJavascriptAggregators() TestHelper.assertExpectedObjects(expectedResults, results, ""); } + @Test + public void testSubqueryWithOuterJavascriptAggregators() + { + final GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("market", "market"), + new DefaultDimensionSpec("quality", "quality"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("index", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "quality"))) + .setAggregatorSpecs( + Arrays.asList( + new JavaScriptAggregatorFactory( + "js_agg", + Arrays.asList("index", "rows"), + "function(current, index, rows){return current + index + rows;}", + "function(){return 0;}", + "function(a,b){return a + b;}", + JavaScriptConfig.getDefault() + ) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "automotive", "js_agg", 136D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "business", "js_agg", 119D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "entertainment", "js_agg", 159D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "health", "js_agg", 121D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "mezzanine", "js_agg", 2873D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "news", "js_agg", 122D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "premium", "js_agg", 2903D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "technology", "js_agg", 79D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "travel", "js_agg", 120D), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "automotive", "js_agg", 148D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "business", "js_agg", 113D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "entertainment", "js_agg", 167D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "health", "js_agg", 114D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "mezzanine", "js_agg", 2450D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "news", "js_agg", 115D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "premium", "js_agg", 2508D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "technology", "js_agg", 98D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "travel", "js_agg", 127D) + ); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + @Test public void testSubqueryWithHyperUniques() { @@ -5459,6 +5585,50 @@ public void testSubqueryWithHyperUniquesPostAggregator() TestHelper.assertExpectedObjects(expectedResults, results, ""); } + @Test + public void testSubqueryWithFirstLast() + { + GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval) + .setDimensions(ImmutableList.of(new DefaultDimensionSpec("market", "market"))) + .setAggregatorSpecs( + ImmutableList.of( + QueryRunnerTestHelper.rowsCount, + new LongFirstAggregatorFactory("innerfirst", "index"), + new LongLastAggregatorFactory("innerlast", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setContext(ImmutableMap.of("finalize", true)) + .build(); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval) + .setDimensions(Lists.newArrayList()) + .setAggregatorSpecs( + ImmutableList.of( + new LongFirstAggregatorFactory("first", "innerfirst"), + new LongLastAggregatorFactory("last", "innerlast") + ) + ) + .setGranularity(QueryRunnerTestHelper.monthGran) + .build(); + + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-01-01", "first", 100L, "last", 943L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-02-01", "first", 132L, "last", 1101L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-01", "first", 153L, "last", 1063L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "first", 135L, "last", 780L) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + @Test public void testGroupByWithTimeColumn() { diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index 38b71f23e367..1b74bbb5332f 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -112,6 +112,13 @@ public GroupByTimeseriesQueryRunnerTest(QueryRunner runner) super(runner, false); } + @Override + public void testEmptyTimeseries() + { + // Skip this test because the timeseries test expects the empty range to have one entry, but group by + // does not expect anything + } + @Override public void testFullOnTimeseries() { diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java index 274c4acaa21e..3f3327e7a7a2 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -36,6 +36,8 @@ import io.druid.query.aggregation.DoubleMaxAggregatorFactory; import io.druid.query.aggregation.DoubleMinAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory; +import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory; +import io.druid.query.aggregation.last.DoubleLastAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.extraction.MapLookupExtractor; @@ -111,6 +113,44 @@ public TimeseriesQueryRunnerTest( this.descending = descending; } + @Test + public void testEmptyTimeseries() + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .intervals(QueryRunnerTestHelper.emptyInterval) + .aggregators( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + QueryRunnerTestHelper.indexDoubleSum, + new DoubleFirstAggregatorFactory("first", "index") + + ) + ) + .descending(descending) + .build(); + + List> expectedResults = ImmutableList.of( + new Result<>( + new DateTime("2020-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", 0L, + "index", 0D, + "first", 0D + ) + ) + ) + ); + + Iterable> actualResults = Sequences.toList( + runner.run(query, CONTEXT), + Lists.>newArrayList() + ); + TestHelper.assertExpectedResults(expectedResults, actualResults); + } + @Test public void testFullOnTimeseries() { @@ -1732,6 +1772,114 @@ public void testTimeseriesWithMultiValueFilteringJavascriptAggregatorAndAlsoRegu assertExpectedResults(expectedResults, actualResults); } + @Test + public void testTimeseriesWithFirstLastAggregator() + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.monthGran) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + ImmutableList.of( + new DoubleFirstAggregatorFactory("first", "index"), + new DoubleLastAggregatorFactory("last", "index") + ) + ) + .descending(descending) + .build(); + + // There's a difference between ascending and descending results since granularity of druid.sample.tsv is days, + // with multiple first and last times. The traversal order difference cause the first and last aggregator + // to select different value from the list of first and last dates + List> expectedAscendingResults = ImmutableList.of( + new Result<>( + new DateTime("2011-01-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "first", new Float(100.000000).doubleValue(), + "last", new Float(943.497198).doubleValue() + ) + ) + ), + new Result<>( + new DateTime("2011-02-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "first", new Float(132.123776).doubleValue(), + "last", new Float(1101.918270).doubleValue() + ) + ) + ), + new Result<>( + new DateTime("2011-03-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "first", new Float(153.059937).doubleValue(), + "last", new Float(1063.201156).doubleValue() + ) + ) + ), + new Result<>( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "first", new Float(135.885094).doubleValue(), + "last", new Float(780.271977).doubleValue() + ) + ) + ) + ); + + List> expectedDescendingResults = ImmutableList.of( + new Result<>( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "first", new Float(1234.247546).doubleValue(), + "last", new Float(106.793700).doubleValue() + ) + ) + ), + new Result<>( + new DateTime("2011-03-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "first", new Float(1004.940887).doubleValue(), + "last", new Float(151.752485).doubleValue() + ) + ) + ), + new Result<>( + new DateTime("2011-02-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "first", new Float(913.561076).doubleValue(), + "last", new Float(122.258195).doubleValue() + ) + ) + ), + new Result<>( + new DateTime("2011-01-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "first", new Float(800.000000).doubleValue(), + "last", new Float(133.740047).doubleValue() + ) + ) + ) + ); + + Iterable> actualResults = Sequences.toList( + runner.run(query, CONTEXT), + Lists.>newArrayList() + ); + if (descending) { + TestHelper.assertExpectedResults(expectedDescendingResults, actualResults); + } else { + TestHelper.assertExpectedResults(expectedAscendingResults, actualResults); + } + } + @Test public void testTimeseriesWithMultiValueDimFilter1() { diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index cd12aff33f5d..ce1a2e4c9c16 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -47,12 +47,15 @@ import io.druid.query.aggregation.DoubleMinAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory; +import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory; +import io.druid.query.aggregation.first.LongFirstAggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; +import io.druid.query.aggregation.last.LongLastAggregatorFactory; import io.druid.query.dimension.ExtractionDimensionSpec; import io.druid.query.extraction.DimExtractionFn; import io.druid.query.extraction.ExtractionFn; @@ -172,6 +175,40 @@ private Sequence> runWithMerge( return mergeRunner.run(query, context); } + @Test + public void testEmptyTopN() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .dimension(QueryRunnerTestHelper.marketDimension) + .metric(QueryRunnerTestHelper.indexMetric) + .threshold(4) + .intervals(QueryRunnerTestHelper.emptyInterval) + .aggregators( + Lists.newArrayList( + Iterables.concat( + QueryRunnerTestHelper.commonAggregators, + Lists.newArrayList( + new DoubleMaxAggregatorFactory("maxIndex", "index"), + new DoubleMinAggregatorFactory("minIndex", "index"), + new DoubleFirstAggregatorFactory("first", "index") + ) + ) + ) + ) + .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) + .build(); + + List> expectedResults = ImmutableList.of( + new Result<>( + new DateTime("2020-04-02T00:00:00.000Z"), + new TopNResultValue(ImmutableList.of()) + ) + ); + assertExpectedResults(expectedResults, query); + } + @Test public void testFullOnTopN() { @@ -449,6 +486,117 @@ public void testTopNOverHyperUniqueFinalizingPostAggregator() assertExpectedResults(expectedResults, query); } + @Test + public void testTopNOverFirstLastAggregator() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.monthGran) + .dimension(QueryRunnerTestHelper.marketDimension) + .metric("last") + .threshold(3) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Arrays.asList( + new LongFirstAggregatorFactory("first", "index"), + new LongLastAggregatorFactory("last", "index") + ) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + new DateTime("2011-01-01T00:00:00.000Z"), + new TopNResultValue( + Arrays.>asList( + ImmutableMap.builder() + .put("market", "total_market") + .put("first", 1000L) + .put("last", 1127L) + .build(), + ImmutableMap.builder() + .put("market", "upfront") + .put("first", 800L) + .put("last", 943L) + .build(), + ImmutableMap.builder() + .put("market", "spot") + .put("first", 100L) + .put("last", 155L) + .build() + ) + ) + ), + new Result<>( + new DateTime("2011-02-01T00:00:00.000Z"), + new TopNResultValue( + Arrays.>asList( + ImmutableMap.builder() + .put("market", "total_market") + .put("first", 1203L) + .put("last", 1292L) + .build(), + ImmutableMap.builder() + .put("market", "upfront") + .put("first", 1667L) + .put("last", 1101L) + .build(), + ImmutableMap.builder() + .put("market", "spot") + .put("first", 132L) + .put("last", 114L) + .build() + ) + ) + ), + new Result<>( + new DateTime("2011-03-01T00:00:00.000Z"), + new TopNResultValue( + Arrays.>asList( + ImmutableMap.builder() + .put("market", "total_market") + .put("first", 1124L) + .put("last", 1366L) + .build(), + ImmutableMap.builder() + .put("market", "upfront") + .put("first", 1166L) + .put("last", 1063L) + .build(), + ImmutableMap.builder() + .put("market", "spot") + .put("first", 153L) + .put("last", 125L) + .build() + ) + ) + ), + new Result<>( + new DateTime("2011-04-01T00:00:00.000Z"), + new TopNResultValue( + Arrays.>asList( + ImmutableMap.builder() + .put("market", "total_market") + .put("first", 1314L) + .put("last", 1029L) + .build(), + ImmutableMap.builder() + .put("market", "upfront") + .put("first", 1447L) + .put("last", 780L) + .build(), + ImmutableMap.builder() + .put("market", "spot") + .put("first", 135L) + .put("last", 120L) + .build() + ) + ) + ) + ); + assertExpectedResults(expectedResults, query); + } + @Test public void testTopNBySegment() {