Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First and Last Aggregator #3566

Merged
merged 11 commits into from
Dec 16, 2016
45 changes: 45 additions & 0 deletions common/src/main/java/io/druid/collections/SerializablePair.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.collections;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.java.util.common.Pair;

public class SerializablePair<T1, T2> extends Pair<T1, T2>
{
@JsonCreator
public SerializablePair(@JsonProperty("lhs") T1 lhs, @JsonProperty("rhs") T2 rhs)
{
super(lhs, rhs);
}

@JsonProperty
public T1 getLhs()
{
return lhs;
}

@JsonProperty
public T2 getRhs()
{
return rhs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.collections;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;

public class SerializablePairTest
{
private static final ObjectMapper jsonMapper = new ObjectMapper();

@Test
public void testBytesSerde() throws IOException
{
SerializablePair pair = new SerializablePair<>(5L, 9L);
byte[] bytes = jsonMapper.writeValueAsBytes(pair);
SerializablePair<Number, Number> deserializedPair = jsonMapper.readValue(bytes, SerializablePair.class);
Assert.assertEquals(pair.lhs, deserializedPair.lhs.longValue());
Assert.assertEquals(pair.rhs, deserializedPair.rhs.longValue());
}

@Test
public void testStringSerde() throws IOException
{
SerializablePair pair = new SerializablePair<>(5L, 9L);
String str = jsonMapper.writeValueAsString(pair);
SerializablePair<Number, Number> deserializedPair = jsonMapper.readValue(str, SerializablePair.class);
Assert.assertEquals(pair.lhs, deserializedPair.lhs.longValue());
Assert.assertEquals(pair.rhs, deserializedPair.rhs.longValue());
}
}
54 changes: 54 additions & 0 deletions docs/content/querying/aggregations.md
Original file line number Diff line number Diff line change
@@ -76,6 +76,60 @@ Computes the sum of values as 64-bit floating point value. Similar to `longSum`
{ "type" : "longMax", "name" : <output_name>, "fieldName" : <metric_name> }
```

### First / Last aggregator

First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also document that if the rows are rolled up, the first/last value will be a the first/last value in druid row and not the raw data being ingested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added note about rollup behavior


Note that queries with first/last aggregators on a segment created with rollup enabled will return the rolled up value, and not the last value within the raw ingested data.

#### `doubleFirst` aggregator

`doubleFirst` computes the metric value with the minimum timestamp or 0 if no row exist

```json
{
"type" : "doubleFirst",
"name" : <output_name>,
"fieldName" : <metric_name>
}
```

#### `doubleLast` aggregator

`doubleLast` computes the metric value with the maximum timestamp or 0 if no row exist

```json
{
"type" : "doubleLast",
"name" : <output_name>,
"fieldName" : <metric_name>
}
```

#### `longFirst` aggregator

`longFirst` computes the metric value with the minimum timestamp or 0 if no row exist

```json
{
"type" : "longFirst",
"name" : <output_name>,
"fieldName" : <metric_name>
}
```

#### `longLast` aggregator

`longLast` computes the metric value with the maximum timestamp or 0 if no row exist

```json
{
"type" : "longLast",
"name" : <output_name>,
"fieldName" : <metric_name>,
}
```

### JavaScript aggregator

Computes an arbitrary JavaScript function over a set of columns (both metrics and dimensions are allowed). Your
Original file line number Diff line number Diff line change
@@ -73,6 +73,26 @@
"type": "hyperUnique",
"fieldName": "unique_users",
"name": "unique_users"
},
{
"type" : "doubleFirst",
"name" : "firstAdded",
"fieldName" : "added"
},
{
"type" : "doubleLast",
"name" : "lastAdded",
"fieldName" : "added"
},
{
"type" : "longFirst",
"name" : "firstCount",
"fieldName" : "count"
},
{
"type" : "longLast",
"name" : "lastCount",
"fieldName" : "count"
}
],
"context": {
@@ -87,6 +107,10 @@
"result": {
"added": 9.11526338E8,
"count": 2815650,
"firstAdded": 39.0,
"lastAdded": 210.0,
"firstCount": 1,
"lastCount": 1,
"delta": 5.48967603E8,
"variation": 1.274085073E9,
"delta_hist": {
@@ -174,6 +198,26 @@
"type": "hyperUnique",
"fieldName": "unique_users",
"name": "unique_users"
},
{
"type" : "doubleFirst",
"name" : "firstAdded",
"fieldName" : "added"
},
{
"type" : "doubleLast",
"name" : "lastAdded",
"fieldName" : "added"
},
{
"type" : "longFirst",
"name" : "firstCount",
"fieldName" : "count"
},
{
"type" : "longLast",
"name" : "lastCount",
"fieldName" : "count"
}
],
"context": {
@@ -188,6 +232,10 @@
"result": {
"added": 3.49393993E8,
"count": 1829240,
"firstAdded": 39.0,
"lastAdded": 210.0,
"firstCount": 1,
"lastCount": 1,
"delta": 2.24089868E8,
"variation": 4.74698118E8,
"delta_hist": {
@@ -365,6 +413,26 @@
"type": "hyperUnique",
"fieldName": "unique_users",
"name": "unique_users"
},
{
"type" : "doubleFirst",
"name" : "firstAdded",
"fieldName" : "added"
},
{
"type" : "doubleLast",
"name" : "lastAdded",
"fieldName" : "added"
},
{
"type" : "longFirst",
"name" : "firstCount",
"fieldName" : "count"
},
{
"type" : "longLast",
"name" : "lastCount",
"fieldName" : "count"
}
],
"dimension": "page",
@@ -383,6 +451,10 @@
{
"added": 1812960.0,
"count": 1697,
"firstCount": 2,
"lastCount": 3,
"firstAdded": 462.0,
"lastAdded": 1871.0,
"page": "Wikipedia:Administrators'_noticeboard/Incidents",
"delta": 770071.0,
"variation": 2855849.0,
@@ -393,6 +465,10 @@
{
"added": 70162.0,
"count": 967,
"firstCount": 1,
"lastCount": 1,
"firstAdded": 12.0,
"lastAdded": 129.0,
"page": "2013",
"delta": 40872.0,
"variation": 99452.0,
@@ -403,6 +479,10 @@
{
"added": 519152.0,
"count": 1700,
"firstCount": 1,
"lastCount": 5,
"firstAdded": 0.0,
"lastAdded": 2399.0,
"page": "Wikipedia:Vandalismusmeldung",
"delta": -5446.0,
"variation": 1043750.0,
@@ -480,6 +560,26 @@
"type": "hyperUnique",
"fieldName": "unique_users",
"name": "unique_users"
},
{
"type" : "doubleFirst",
"name" : "firstAdded",
"fieldName" : "added"
},
{
"type" : "doubleLast",
"name" : "lastAdded",
"fieldName" : "added"
},
{
"type" : "longFirst",
"name" : "firstCount",
"fieldName" : "count"
},
{
"type" : "longLast",
"name" : "lastCount",
"fieldName" : "count"
}
],
"dimension": "page",
@@ -498,6 +598,10 @@
{
"added": 61739.0,
"count": 852,
"firstCount": 1,
"lastCount": 1,
"firstAdded": 12.0,
"lastAdded": 129.0,
"page": "2013",
"delta": 35313.0,
"variation": 88165.0,
@@ -508,6 +612,10 @@
{
"added": 28288.0,
"count": 513,
"firstCount": 1,
"lastCount": 1,
"firstAdded": 29.0,
"lastAdded": 37.0,
"page": "Gérard_Depardieu",
"delta": 7027.0,
"variation": 49549.0,
@@ -518,6 +626,10 @@
{
"added": 10951.0,
"count": 459,
"firstCount": 1,
"lastCount": 1,
"firstAdded": 29.0,
"lastAdded": 35.0,
"page": "Zichyújfalu",
"delta": 9030.0,
"variation": 12872.0,
@@ -570,6 +682,26 @@
"type": "hyperUnique",
"fieldName": "unique_users",
"name": "unique_users"
},
{
"type" : "doubleFirst",
"name" : "firstAdded",
"fieldName" : "added"
},
{
"type" : "doubleLast",
"name" : "lastAdded",
"fieldName" : "added"
},
{
"type" : "longFirst",
"name" : "firstCount",
"fieldName" : "count"
},
{
"type" : "longLast",
"name" : "lastCount",
"fieldName" : "count"
}
],
"postAggregations": [
@@ -619,6 +751,10 @@
{
"added": 151409.0,
"count": 1770,
"firstCount": 9,
"lastCount": 9,
"firstAdded": 1612.0,
"lastAdded": 560.0,
"page": "User:Cyde/List_of_candidates_for_speedy_deletion/Subpage",
"delta": 670.0,
"variation": 302148.0,
@@ -630,6 +766,10 @@
{
"added": 519152.0,
"count": 1700,
"firstCount": 1,
"lastCount": 5,
"firstAdded": 0.0,
"lastAdded": 2399.0,
"page": "Wikipedia:Vandalismusmeldung",
"delta": -5446.0,
"variation": 1043750.0,
@@ -641,6 +781,10 @@
{
"added": 1812960.0,
"count": 1697,
"firstCount": 2,
"lastCount": 3,
"firstAdded": 462.0,
"lastAdded": 1871.0,
"page": "Wikipedia:Administrators'_noticeboard/Incidents",
"delta": 770071.0,
"variation": 2855849.0,
@@ -865,7 +1009,7 @@
]
},
{
"description": "groupBy, two aggs, namespace + robot dim, postAggs",
"description": "groupBy, six aggs, namespace + robot dim, postAggs",
"query": {
"queryType": "groupBy",
"dataSource": "wikipedia_editstream",
@@ -880,6 +1024,26 @@
"type": "longSum",
"fieldName": "count",
"name": "count"
},
{
"type" : "doubleFirst",
"name" : "firstAdded",
"fieldName" : "added"
},
{
"type" : "doubleLast",
"name" : "lastAdded",
"fieldName" : "added"
},
{
"type" : "longFirst",
"name" : "firstCount",
"fieldName" : "count"
},
{
"type" : "longLast",
"name" : "lastCount",
"fieldName" : "count"
}
],
"postAggregations": [
@@ -920,6 +1084,10 @@
"event": {
"sumOfRowsAndCount": 2268154.0,
"count": 1286354,
"firstCount": 1,
"lastCount": 1,
"firstAdded": 70.0,
"lastAdded": 210.0,
"robot": "0",
"rows": 981800,
"namespace": "article"
@@ -931,6 +1099,10 @@
"event": {
"sumOfRowsAndCount": 1385233.0,
"count": 693711,
"firstCount": 1,
"lastCount": 1,
"firstAdded": 39.0,
"lastAdded": 0.0,
"robot": "1",
"rows": 691522,
"namespace": "article"
@@ -942,6 +1114,10 @@
"event": {
"sumOfRowsAndCount": 878393.0,
"count": 492643,
"firstCount": 2,
"lastCount": 1,
"firstAdded": 431.0,
"lastAdded": 43.0,
"robot": "0",
"rows": 385750,
"namespace": "wikipedia"
Original file line number Diff line number Diff line change
@@ -29,8 +29,11 @@
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import io.druid.query.aggregation.HistogramAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
import io.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregatorFactory;
import io.druid.query.aggregation.LongMinAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
@@ -39,6 +42,7 @@
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.query.aggregation.last.LongLastAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.DoubleGreatestPostAggregator;
@@ -79,7 +83,11 @@ public AggregatorsModule()
@JsonSubTypes.Type(name = "histogram", value = HistogramAggregatorFactory.class),
@JsonSubTypes.Type(name = "hyperUnique", value = HyperUniquesAggregatorFactory.class),
@JsonSubTypes.Type(name = "cardinality", value = CardinalityAggregatorFactory.class),
@JsonSubTypes.Type(name = "filtered", value = FilteredAggregatorFactory.class)
@JsonSubTypes.Type(name = "filtered", value = FilteredAggregatorFactory.class),
@JsonSubTypes.Type(name = "longFirst", value = LongFirstAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleFirst", value = DoubleFirstAggregatorFactory.class),
@JsonSubTypes.Type(name = "longLast", value = LongLastAggregatorFactory.class),
@JsonSubTypes.Type(name = "doubleLast", value = DoubleLastAggregatorFactory.class)
})
public static interface AggregatorFactoryMixin
{
Original file line number Diff line number Diff line change
@@ -87,7 +87,8 @@ public Sequence<T> run(final Query<T> queryParam, final Map<String, Object> resp
final Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
querySpecificConfig,
bufferPool
bufferPool,
true
);
final Pair<Queue, Accumulator<Queue, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final boolean bySegment = BaseQuery.getContextBySegment(query, false);
Original file line number Diff line number Diff line change
@@ -171,7 +171,7 @@ public List<AggregatorFactory> getRequiredColumns()
@Override
public AggregatorFactory apply(String input)
{
return new JavaScriptAggregatorFactory(input, fieldNames, fnAggregate, fnReset, fnCombine, config);
return new JavaScriptAggregatorFactory(input, Lists.newArrayList(input), fnCombine, fnReset, fnCombine, config);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems unrelated to this PR ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's needed due to the changes within groupby in the PR

}
}
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.first;

import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;

public class DoubleFirstAggregator implements Aggregator
{

private final FloatColumnSelector valueSelector;
private final LongColumnSelector timeSelector;
private final String name;

protected long firstTime;
protected double firstValue;

public DoubleFirstAggregator(
String name,
LongColumnSelector timeSelector,
FloatColumnSelector valueSelector
)
{
this.name = name;
this.valueSelector = valueSelector;
this.timeSelector = timeSelector;

reset();
}

@Override
public void aggregate()
{
long time = timeSelector.get();
if (time < firstTime) {
firstTime = time;
firstValue = valueSelector.get();
}
}

@Override
public void reset()
{
firstTime = Long.MAX_VALUE;
firstValue = 0;
}

@Override
public Object get()
{
return new SerializablePair<>(firstTime, firstValue);
}

@Override
public float getFloat()
{
return (float) firstValue;
}

@Override
public String getName()
{
return name;
}

@Override
public void close()
{

}

@Override
public long getLong()
{
return (long) firstValue;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.first;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;
import com.metamx.common.StringUtils;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

public class DoubleFirstAggregatorFactory extends AggregatorFactory
{
public static final Comparator VALUE_COMPARATOR = new Comparator()
{
@Override
public int compare(Object o1, Object o2)
{
return Doubles.compare(((SerializablePair<Long, Double>) o1).rhs, ((SerializablePair<Long, Double>) o2).rhs);
}
};

public static final Comparator TIME_COMPARATOR = new Comparator()
{
@Override
public int compare(Object o1, Object o2)
{
return Longs.compare(((SerializablePair<Long, Object>) o1).lhs, ((SerializablePair<Long, Object>) o2).lhs);
}
};

private static final byte CACHE_TYPE_ID = 16;

private final String fieldName;
private final String name;

@JsonCreator
public DoubleFirstAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");

this.name = name;
this.fieldName = fieldName;
}

@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new DoubleFirstAggregator(
name,
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeFloatColumnSelector(fieldName)
);
}

@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DoubleFirstBufferAggregator(
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeFloatColumnSelector(fieldName)
);
}

@Override
public Comparator getComparator()
{
return VALUE_COMPARATOR;
}

@Override
public Object combine(Object lhs, Object rhs)
{
return TIME_COMPARATOR.compare(lhs, rhs) <= 0 ? lhs : rhs;
}

@Override
public AggregatorFactory getCombiningFactory()
{
return new DoubleFirstAggregatorFactory(name, name)
{
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new DoubleFirstAggregator(name, null, null)
{
@Override
public void aggregate()
{
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.get();
if (pair.lhs < firstTime) {
firstTime = pair.lhs;
firstValue = pair.rhs;
}
}
};
}

@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new DoubleFirstBufferAggregator(null, null)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.get();
long firstTime = buf.getLong(position);
if (pair.lhs < firstTime) {
buf.putLong(position, pair.lhs);
buf.putDouble(position + Longs.BYTES, pair.rhs);
}
}
};
}
};
}

@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}

@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new DoubleFirstAggregatorFactory(fieldName, fieldName));
}

@Override
public Object deserialize(Object object)
{
Map map = (Map) object;
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).doubleValue());
}

@Override
public Object finalizeComputation(Object object)
{
return ((SerializablePair<Long, Double>) object).rhs;
}

@Override
@JsonProperty
public String getName()
{
return name;
}

@JsonProperty
public String getFieldName()
{
return fieldName;
}

@Override
public List<String> requiredFields()
{
return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName);
}

@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);

return ByteBuffer.allocate(2 + fieldNameBytes.length)
.put(CACHE_TYPE_ID)
.put(fieldNameBytes)
.put((byte)0xff)
.array();
}

@Override
public String getTypeName()
{
return "float";
}

@Override
public int getMaxIntermediateSize()
{
return Longs.BYTES + Doubles.BYTES;
}

@Override
public Object getAggregatorStartValue()
{
throw new UnsupportedOperationException();
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

DoubleFirstAggregatorFactory that = (DoubleFirstAggregatorFactory) o;

return fieldName.equals(that.fieldName) && name.equals(that.name);
}

@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + fieldName.hashCode();
return result;
}

@Override
public String toString()
{
return "DoubleFirstAggregatorFactory{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.first;

import com.google.common.primitives.Longs;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;

import java.nio.ByteBuffer;

public class DoubleFirstBufferAggregator implements BufferAggregator
{
private final LongColumnSelector timeSelector;
private final FloatColumnSelector valueSelector;

public DoubleFirstBufferAggregator(LongColumnSelector timeSelector, FloatColumnSelector valueSelector)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
}

@Override
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, Long.MAX_VALUE);
buf.putDouble(position + Longs.BYTES, 0);
}

@Override
public void aggregate(ByteBuffer buf, int position)
{
long time = timeSelector.get();
long firstTime = buf.getLong(position);
if (time < firstTime) {
buf.putLong(position, time);
buf.putDouble(position + Longs.BYTES, valueSelector.get());
}
}

@Override
public Object get(ByteBuffer buf, int position)
{
return new SerializablePair<>(buf.getLong(position), buf.getDouble(position + Longs.BYTES));
}

@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getDouble(position + Longs.BYTES);
}

@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position + Longs.BYTES);
}

@Override
public void close()
{
// no resources to cleanup
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.first;

import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.LongColumnSelector;

public class LongFirstAggregator implements Aggregator
{

private final LongColumnSelector valueSelector;
private final LongColumnSelector timeSelector;
private final String name;

protected long firstTime;
protected long firstValue;

public LongFirstAggregator(
String name,
LongColumnSelector timeSelector,
LongColumnSelector valueSelector
)
{
this.name = name;
this.valueSelector = valueSelector;
this.timeSelector = timeSelector;

reset();
}

@Override
public void aggregate()
{
long time = timeSelector.get();
if (time < firstTime) {
firstTime = time;
firstValue = valueSelector.get();
}
}

@Override
public void reset()
{
firstTime = Long.MAX_VALUE;
firstValue = 0;
}

@Override
public Object get()
{
return new SerializablePair<>(firstTime, firstValue);
}

@Override
public float getFloat()
{
return (float) firstValue;
}

@Override
public String getName()
{
return name;
}

@Override
public void close()
{

}

@Override
public long getLong()
{
return firstValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.first;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import com.metamx.common.StringUtils;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

public class LongFirstAggregatorFactory extends AggregatorFactory
{
public static final Comparator VALUE_COMPARATOR = new Comparator()
{
@Override
public int compare(Object o1, Object o2)
{
return Longs.compare(((SerializablePair<Long, Long>) o1).rhs, ((SerializablePair<Long, Long>) o2).rhs);
}
};

private static final byte CACHE_TYPE_ID = 17;

private final String fieldName;
private final String name;

@JsonCreator
public LongFirstAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");

this.name = name;
this.fieldName = fieldName;
}

@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new LongFirstAggregator(
name,
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeLongColumnSelector(fieldName)
);
}

@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new LongFirstBufferAggregator(
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeLongColumnSelector(fieldName)
);
}

@Override
public Comparator getComparator()
{
return VALUE_COMPARATOR;
}

@Override
public Object combine(Object lhs, Object rhs)
{
return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) <= 0 ? lhs : rhs;
}

@Override
public AggregatorFactory getCombiningFactory()
{
return new LongFirstAggregatorFactory(name, name)
{
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new LongFirstAggregator(name, null, null)
{
@Override
public void aggregate()
{
SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) selector.get();
if (pair.lhs < firstTime) {
firstTime = pair.lhs;
firstValue = pair.rhs;
}
}
};
}

@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new LongFirstBufferAggregator(null, null)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) selector.get();
long firstTime = buf.getLong(position);
if (pair.lhs < firstTime) {
buf.putLong(position, pair.lhs);
buf.putLong(position + Longs.BYTES, pair.rhs);
}
}
};
}
};
}

@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}

@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new LongFirstAggregatorFactory(fieldName, fieldName));
}

@Override
public Object deserialize(Object object)
{
Map map = (Map) object;
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).longValue());
}

@Override
public Object finalizeComputation(Object object)
{
return ((SerializablePair<Long, Long>) object).rhs;
}

@Override
@JsonProperty
public String getName()
{
return name;
}

@JsonProperty
public String getFieldName()
{
return fieldName;
}

@Override
public List<String> requiredFields()
{
return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName);
}

@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);

return ByteBuffer.allocate(2 + fieldNameBytes.length)
.put(CACHE_TYPE_ID)
.put(fieldNameBytes)
.put((byte)0xff)
.array();
}

@Override
public String getTypeName()
{
return "long";
}

@Override
public int getMaxIntermediateSize()
{
return Longs.BYTES * 2;
}

@Override
public Object getAggregatorStartValue()
{
throw new UnsupportedOperationException();
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

LongFirstAggregatorFactory that = (LongFirstAggregatorFactory) o;

return fieldName.equals(that.fieldName) && name.equals(that.name);
}

@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + fieldName.hashCode();
return result;
}

@Override
public String toString()
{
return "LongFirstAggregatorFactory{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.first;

import com.google.common.primitives.Longs;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.LongColumnSelector;

import java.nio.ByteBuffer;

public class LongFirstBufferAggregator implements BufferAggregator
{
private final LongColumnSelector timeSelector;
private final LongColumnSelector valueSelector;

public LongFirstBufferAggregator(LongColumnSelector timeSelector, LongColumnSelector valueSelector)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
}

@Override
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, Long.MAX_VALUE);
buf.putLong(position + Longs.BYTES, 0);
}

@Override
public void aggregate(ByteBuffer buf, int position)
{
long time = timeSelector.get();
long firstTime = buf.getLong(position);
if (time < firstTime) {
buf.putLong(position, time);
buf.putLong(position + Longs.BYTES, valueSelector.get());
}
}

@Override
public Object get(ByteBuffer buf, int position)
{
return new SerializablePair<>(buf.getLong(position), buf.getLong(position + Longs.BYTES));
}

@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getLong(position + Longs.BYTES);
}

@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position + Longs.BYTES);
}

@Override
public void close()
{
// no resources to cleanup
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.last;

import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;

public class DoubleLastAggregator implements Aggregator
{

private final FloatColumnSelector valueSelector;
private final LongColumnSelector timeSelector;
private final String name;

protected long lastTime;
protected double lastValue;

public DoubleLastAggregator(
String name,
LongColumnSelector timeSelector,
FloatColumnSelector valueSelector
)
{
this.name = name;
this.valueSelector = valueSelector;
this.timeSelector = timeSelector;

reset();
}

@Override
public void aggregate()
{
long time = timeSelector.get();
if (time >= lastTime) {
lastTime = timeSelector.get();
lastValue = valueSelector.get();
}
}

@Override
public void reset()
{
lastTime = Long.MIN_VALUE;
lastValue = 0;
}

@Override
public Object get()
{
return new SerializablePair<>(lastTime, lastValue);
}

@Override
public float getFloat()
{
return (float) lastValue;
}

@Override
public String getName()
{
return name;
}

@Override
public void close()
{

}

@Override
public long getLong()
{
return (long) lastValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.last;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;
import com.metamx.common.StringUtils;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

public class DoubleLastAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 18;

private final String fieldName;
private final String name;

@JsonCreator
public DoubleLastAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name;
this.fieldName = fieldName;
}

@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new DoubleLastAggregator(
name,
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeFloatColumnSelector(fieldName)
);
}

@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DoubleLastBufferAggregator(
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeFloatColumnSelector(fieldName)
);
}

@Override
public Comparator getComparator()
{
return DoubleFirstAggregatorFactory.VALUE_COMPARATOR;
}

@Override
public Object combine(Object lhs, Object rhs)
{
return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs;
}

@Override
public AggregatorFactory getCombiningFactory()
{
return new DoubleLastAggregatorFactory(name, name)
{
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new DoubleLastAggregator(name, null, null)
{
@Override
public void aggregate()
{
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.get();
if (pair.lhs >= lastTime) {
lastTime = pair.lhs;
lastValue = pair.rhs;
}
}
};
}

@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new DoubleLastBufferAggregator(null, null)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.get();
long lastTime = buf.getLong(position);
if (pair.lhs >= lastTime) {
buf.putLong(position, pair.lhs);
buf.putDouble(position + Longs.BYTES, pair.rhs);
}
}
};
}
};
}

@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}

@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new LongFirstAggregatorFactory(fieldName, fieldName));
}

@Override
public Object deserialize(Object object)
{
Map map = (Map) object;
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).doubleValue());
}

@Override
public Object finalizeComputation(Object object)
{
return ((SerializablePair<Long, Double>) object).rhs;
}

@Override
@JsonProperty
public String getName()
{
return name;
}

@JsonProperty
public String getFieldName()
{
return fieldName;
}

@Override
public List<String> requiredFields()
{
return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName);
}

@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);

return ByteBuffer.allocate(2 + fieldNameBytes.length)
.put(CACHE_TYPE_ID)
.put(fieldNameBytes)
.put((byte)0xff)
.array();
}

@Override
public String getTypeName()
{
return "float";
}

@Override
public int getMaxIntermediateSize()
{
return Longs.BYTES + Doubles.BYTES;
}

@Override
public Object getAggregatorStartValue()
{
throw new UnsupportedOperationException();
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

DoubleLastAggregatorFactory that = (DoubleLastAggregatorFactory) o;

return fieldName.equals(that.fieldName) && name.equals(that.name);
}

@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + fieldName.hashCode();
return result;
}

@Override
public String toString()
{
return "DoubleLastAggregatorFactory{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.last;

import com.google.common.primitives.Longs;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;

import java.nio.ByteBuffer;

public class DoubleLastBufferAggregator implements BufferAggregator
{
private final LongColumnSelector timeSelector;
private final FloatColumnSelector valueSelector;

public DoubleLastBufferAggregator(LongColumnSelector timeSelector, FloatColumnSelector valueSelector)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
}

@Override
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, Long.MIN_VALUE);
buf.putDouble(position + Longs.BYTES, 0);
}

@Override
public void aggregate(ByteBuffer buf, int position)
{
long time = timeSelector.get();
long lastTime = buf.getLong(position);
if (time >= lastTime) {
buf.putLong(position, time);
buf.putDouble(position + Longs.BYTES, valueSelector.get());
}
}

@Override
public Object get(ByteBuffer buf, int position)
{
return new SerializablePair<>(buf.getLong(position), buf.getDouble(position + Longs.BYTES));
}

@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getDouble(position + Longs.BYTES);
}

@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position + Longs.BYTES);
}

@Override
public void close()
{
// no resources to cleanup
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.last;

import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.LongColumnSelector;

public class LongLastAggregator implements Aggregator
{
private final LongColumnSelector valueSelector;
private final LongColumnSelector timeSelector;
private final String name;

protected long lastTime;
protected long lastValue;

public LongLastAggregator(
String name,
LongColumnSelector timeSelector,
LongColumnSelector valueSelector
)
{
this.name = name;
this.valueSelector = valueSelector;
this.timeSelector = timeSelector;

reset();
}

@Override
public void aggregate()
{
long time = timeSelector.get();
if (time >= lastTime) {
lastTime = timeSelector.get();
lastValue = valueSelector.get();
}
}

@Override
public void reset()
{
lastTime = Long.MIN_VALUE;
lastValue = 0;
}

@Override
public Object get()
{
return new SerializablePair<>(lastTime, lastValue);
}

@Override
public float getFloat()
{
return (float) lastValue;
}

@Override
public String getName()
{
return name;
}

@Override
public void close()
{

}

@Override
public long getLong()
{
return lastValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.last;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import com.metamx.common.StringUtils;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

public class LongLastAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 19;

private final String fieldName;
private final String name;

@JsonCreator
public LongLastAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName
)
{
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name;
this.fieldName = fieldName;
}

@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return new LongLastAggregator(
name,
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeLongColumnSelector(fieldName)
);
}

@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new LongLastBufferAggregator(
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME),
metricFactory.makeLongColumnSelector(fieldName)
);
}

@Override
public Comparator getComparator()
{
return LongFirstAggregatorFactory.VALUE_COMPARATOR;
}

@Override
public Object combine(Object lhs, Object rhs)
{
return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs;
}


@Override
public AggregatorFactory getCombiningFactory()
{
return new LongLastAggregatorFactory(name, name)
{
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new LongLastAggregator(name, null, null)
{
@Override
public void aggregate()
{
SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) selector.get();
if (pair.lhs >= lastTime) {
lastTime = pair.lhs;
lastValue = pair.rhs;
}
}
};
}

@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name);
return new LongLastBufferAggregator(null, null)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) selector.get();
long lastTime = buf.getLong(position);
if (pair.lhs >= lastTime) {
buf.putLong(position, pair.lhs);
buf.putLong(position + Longs.BYTES, pair.rhs);
}
}
};
}
};
}

@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}

@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new LongLastAggregatorFactory(fieldName, fieldName));
}

@Override
public Object deserialize(Object object)
{
Map map = (Map) object;
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).longValue());
}

@Override
public Object finalizeComputation(Object object)
{
return ((SerializablePair<Long, Long>) object).rhs;
}

@Override
@JsonProperty
public String getName()
{
return name;
}

@JsonProperty
public String getFieldName()
{
return fieldName;
}

@Override
public List<String> requiredFields()
{
return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName);
}

@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);

return ByteBuffer.allocate(2 + fieldNameBytes.length)
.put(CACHE_TYPE_ID)
.put(fieldNameBytes)
.put((byte)0xff)
.array();
}

@Override
public String getTypeName()
{
return "long";
}

@Override
public int getMaxIntermediateSize()
{
return Longs.BYTES * 2;
}

@Override
public Object getAggregatorStartValue()
{
throw new UnsupportedOperationException();
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

LongLastAggregatorFactory that = (LongLastAggregatorFactory) o;

return name.equals(that.name) && fieldName.equals(that.fieldName);
}

@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + fieldName.hashCode();
return result;
}

@Override
public String toString()
{
return "LongLastAggregatorFactory{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.last;

import com.google.common.primitives.Longs;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.LongColumnSelector;

import java.nio.ByteBuffer;

public class LongLastBufferAggregator implements BufferAggregator
{
private final LongColumnSelector timeSelector;
private final LongColumnSelector valueSelector;

public LongLastBufferAggregator(LongColumnSelector timeSelector, LongColumnSelector valueSelector)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
}

@Override
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, Long.MIN_VALUE);
buf.putLong(position + Longs.BYTES, 0);
}

@Override
public void aggregate(ByteBuffer buf, int position)
{
long time = timeSelector.get();
long lastTime = buf.getLong(position);
if (time >= lastTime) {
buf.putLong(position, time);
buf.putLong(position + Longs.BYTES, valueSelector.get());
}
}

@Override
public Object get(ByteBuffer buf, int position)
{
return new SerializablePair<>(buf.getLong(position), buf.getLong(position + Longs.BYTES));
}

@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getLong(position + Longs.BYTES);
}

@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position + Longs.BYTES);
}

@Override
public void close()
{
// no resources to cleanup
}
}
Original file line number Diff line number Diff line change
@@ -55,7 +55,8 @@ public class GroupByQueryHelper
public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> createIndexAccumulatorPair(
final GroupByQuery query,
final GroupByQueryConfig config,
StupidPool<ByteBuffer> bufferPool
StupidPool<ByteBuffer> bufferPool,
final boolean combine
)
{
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
@@ -66,17 +67,23 @@ public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> creat
// AllGranularity returns timeStart instead of Long.MIN_VALUE
final long granTimeStart = gran.iterable(timeStart, timeStart + 1).iterator().next();

final List<AggregatorFactory> aggs = Lists.transform(
query.getAggregatorSpecs(),
new Function<AggregatorFactory, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(AggregatorFactory input)
final List<AggregatorFactory> aggs;
if (combine) {
aggs = Lists.transform(
query.getAggregatorSpecs(),
new Function<AggregatorFactory, AggregatorFactory>()
{
return input.getCombiningFactory();
@Override
public AggregatorFactory apply(AggregatorFactory input)
{
return input.getCombiningFactory();
}
}
}
);
);
} else {
aggs = query.getAggregatorSpecs();
}

final List<String> dimensions = Lists.transform(
query.getDimensions(),
new Function<DimensionSpec, String>()
@@ -178,13 +185,15 @@ public static IncrementalIndex makeIncrementalIndex(
GroupByQuery query,
GroupByQueryConfig config,
StupidPool<ByteBuffer> bufferPool,
Sequence<Row> rows
Sequence<Row> rows,
boolean combine
)
{
Pair<IncrementalIndex, Accumulator<IncrementalIndex, Row>> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair(
query,
config,
bufferPool
bufferPool,
combine
);

return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
import io.druid.granularity.QueryGranularity;
import io.druid.guice.annotations.Global;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.MappedSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
@@ -172,7 +173,21 @@ private Sequence<Row> mergeGroupByResults(
runner,
context
);
return strategySelector.strategize(query).processSubqueryResult(subquery, query, subqueryResult);

final Sequence<Row> finalizingResults;
if (GroupByQuery.getContextFinalize(subquery, false)) {
finalizingResults = new MappedSequence<>(
subqueryResult,
makePreComputeManipulatorFn(
subquery,
MetricManipulatorFns.finalizing()
)
);
} else {
finalizingResults = subqueryResult;
}

return strategySelector.strategize(query).processSubqueryResult(subquery, query, finalizingResults);
} else {
return strategySelector.strategize(query).mergeResults(runner, query, context);
}
Original file line number Diff line number Diff line change
@@ -116,7 +116,8 @@ public Sequence<Row> mergeResults(
)
),
responseContext
)
),
true
);

return new ResourceClosingSequence<>(query.applyLimit(GroupByQueryHelper.postAggregate(query, index)), index);
@@ -178,21 +179,26 @@ public boolean apply(AggregatorFactory agg)
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
.build();

final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex(
final IncrementalIndex innerQueryResultIndex = GroupByQueryHelper.makeIncrementalIndex(
innerQuery.withOverriddenContext(
ImmutableMap.<String, Object>of(
GroupByQueryHelper.CTX_KEY_SORT_RESULTS, true
)
),
subqueryResult
configSupplier.get(),
bufferPool,
subqueryResult,
false
);

//Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which
//is ensured by QuerySegmentSpec.
//GroupByQueryEngine can only process one interval at a time, so we need to call it once per interval
//and concatenate the results.
final IncrementalIndex outerQueryResultIndex = makeIncrementalIndex(
final IncrementalIndex outerQueryResultIndex = GroupByQueryHelper.makeIncrementalIndex(
outerQuery,
configSupplier.get(),
bufferPool,
Sequences.concat(
Sequences.map(
Sequences.simple(outerQuery.getIntervals()),
@@ -210,7 +216,8 @@ public Sequence<Row> apply(Interval interval)
}
}
)
)
),
true
);

innerQueryResultIndex.close();
@@ -221,11 +228,6 @@ public Sequence<Row> apply(Interval interval)
);
}

private IncrementalIndex makeIncrementalIndex(GroupByQuery query, Sequence<Row> rows)
{
return GroupByQueryHelper.makeIncrementalIndex(query, configSupplier.get(), bufferPool, rows);
}

@Override
public QueryRunner<Row> mergeRunners(
final ListeningExecutorService exec,
Original file line number Diff line number Diff line change
@@ -106,6 +106,7 @@ public TableDataSource apply(@Nullable String input)
public static final DateTime minTime = new DateTime("2011-01-12T00:00:00.000Z");

public static final QueryGranularity dayGran = QueryGranularities.DAY;
public static final QueryGranularity monthGran = QueryGranularities.MONTH;
public static final QueryGranularity allGran = QueryGranularities.ALL;
public static final String timeDimension = "__time";
public static final String marketDimension = "market";
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.first;

import io.druid.collections.SerializablePair;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Pair;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.TestFloatColumnSelector;
import io.druid.query.aggregation.TestLongColumnSelector;
import io.druid.query.aggregation.TestObjectColumnSelector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.column.Column;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.nio.ByteBuffer;

public class DoubleFirstAggregationTest
{
private DoubleFirstAggregatorFactory doubleFirstAggFactory;
private DoubleFirstAggregatorFactory combiningAggFactory;
private ColumnSelectorFactory colSelectorFactory;
private TestLongColumnSelector timeSelector;
private TestFloatColumnSelector valueSelector;
private TestObjectColumnSelector objectSelector;

private float[] floatValues = {1.1f, 2.7f, 3.5f, 1.3f};
private long[] times = {12, 10, 5344, 7899999};
private SerializablePair[] pairs = {
new SerializablePair<>(1467225096L, 134.3d),
new SerializablePair<>(23163L, 1232.212d),
new SerializablePair<>(742L, 18d),
new SerializablePair<>(111111L, 233.5232d)
};

@Before
public void setup()
{
doubleFirstAggFactory = new DoubleFirstAggregatorFactory("billy", "nilly");
combiningAggFactory = (DoubleFirstAggregatorFactory) doubleFirstAggFactory.getCombiningFactory();
timeSelector = new TestLongColumnSelector(times);
valueSelector = new TestFloatColumnSelector(floatValues);
objectSelector = new TestObjectColumnSelector(pairs);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector);
EasyMock.expect(colSelectorFactory.makeFloatColumnSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("billy")).andReturn(objectSelector);
EasyMock.replay(colSelectorFactory);
}

@Test
public void testDoubleFirstAggregator()
{
DoubleFirstAggregator agg = (DoubleFirstAggregator) doubleFirstAggFactory.factorize(colSelectorFactory);

Assert.assertEquals("billy", agg.getName());

aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);

Pair<Long, Double> result = (Pair<Long, Double>) agg.get();

Assert.assertEquals(times[1], result.lhs.longValue());
Assert.assertEquals(floatValues[1], result.rhs, 0.0001);
Assert.assertEquals((long) floatValues[1], agg.getLong());
Assert.assertEquals(floatValues[1], agg.getFloat(), 0.0001);

agg.reset();
Assert.assertEquals(0, ((Pair<Long, Double>) agg.get()).rhs, 0.0001);
}

@Test
public void testDoubleFirstBufferAggregator()
{
DoubleFirstBufferAggregator agg = (DoubleFirstBufferAggregator) doubleFirstAggFactory.factorizeBuffered(
colSelectorFactory);

ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleFirstAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);

aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);

Pair<Long, Double> result = (Pair<Long, Double>) agg.get(buffer, 0);

Assert.assertEquals(times[1], result.lhs.longValue());
Assert.assertEquals(floatValues[1], result.rhs, 0.0001);
Assert.assertEquals((long) floatValues[1], agg.getLong(buffer, 0));
Assert.assertEquals(floatValues[1], agg.getFloat(buffer, 0), 0.0001);
}

@Test
public void testCombine()
{
SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621);
SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4);
Assert.assertEquals(pair1, doubleFirstAggFactory.combine(pair1, pair2));
}

@Test
public void testDoubleFirstCombiningAggregator()
{
DoubleFirstAggregator agg = (DoubleFirstAggregator) combiningAggFactory.factorize(colSelectorFactory);

Assert.assertEquals("billy", agg.getName());

aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);

Pair<Long, Double> result = (Pair<Long, Double>) agg.get();
Pair<Long, Double> expected = (Pair<Long, Double>)pairs[2];

Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs, 0.0001);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong());
Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001);

agg.reset();
Assert.assertEquals(0, ((Pair<Long, Double>) agg.get()).rhs, 0.0001);
}

@Test
public void testDoubleFirstCombiningBufferAggregator()
{
DoubleFirstBufferAggregator agg = (DoubleFirstBufferAggregator) combiningAggFactory.factorizeBuffered(
colSelectorFactory);

ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleFirstAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);

aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);

Pair<Long, Double> result = (Pair<Long, Double>) agg.get(buffer, 0);
Pair<Long, Double> expected = (Pair<Long, Double>)pairs[2];

Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs, 0.0001);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0));
Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001);
}


@Test
public void testSerde() throws Exception
{
DefaultObjectMapper mapper = new DefaultObjectMapper();
String doubleSpecJson = "{\"type\":\"doubleFirst\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
Assert.assertEquals(doubleFirstAggFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class));
}

private void aggregate(
DoubleFirstAggregator agg
)
{
agg.aggregate();
timeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}

private void aggregate(
DoubleFirstBufferAggregator agg,
ByteBuffer buff,
int position
)
{
agg.aggregate(buff, position);
timeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.first;

import io.druid.collections.SerializablePair;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Pair;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.TestLongColumnSelector;
import io.druid.query.aggregation.TestObjectColumnSelector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.column.Column;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.nio.ByteBuffer;

public class LongFirstAggregationTest
{
private LongFirstAggregatorFactory longFirstAggFactory;
private LongFirstAggregatorFactory combiningAggFactory;
private ColumnSelectorFactory colSelectorFactory;
private TestLongColumnSelector timeSelector;
private TestLongColumnSelector valueSelector;
private TestObjectColumnSelector objectSelector;

private long[] longValues = {185, -216, -128751132, Long.MIN_VALUE};
private long[] times = {1123126751, 1784247991, 1854329816, 1000000000};
private SerializablePair[] pairs = {
new SerializablePair<>(1L, 113267L),
new SerializablePair<>(1L, 5437384L),
new SerializablePair<>(6L, 34583458L),
new SerializablePair<>(88L, 34583452L)
};

@Before
public void setup()
{
longFirstAggFactory = new LongFirstAggregatorFactory("billy", "nilly");
combiningAggFactory = (LongFirstAggregatorFactory) longFirstAggFactory.getCombiningFactory();
timeSelector = new TestLongColumnSelector(times);
valueSelector = new TestLongColumnSelector(longValues);
objectSelector = new TestObjectColumnSelector(pairs);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector);
EasyMock.expect(colSelectorFactory.makeLongColumnSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("billy")).andReturn(objectSelector);
EasyMock.replay(colSelectorFactory);
}

@Test
public void testLongFirstAggregator()
{
LongFirstAggregator agg = (LongFirstAggregator) longFirstAggFactory.factorize(colSelectorFactory);

Assert.assertEquals("billy", agg.getName());

aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);

Pair<Long, Long> result = (Pair<Long, Long>) agg.get();

Assert.assertEquals(times[3], result.lhs.longValue());
Assert.assertEquals(longValues[3], result.rhs.longValue());
Assert.assertEquals(longValues[3], agg.getLong());
Assert.assertEquals(longValues[3], agg.getFloat(), 0.0001);

agg.reset();
Assert.assertEquals(0, ((Pair<Long, Long>) agg.get()).rhs, 0.0001);
}

@Test
public void testLongFirstBufferAggregator()
{
LongFirstBufferAggregator agg = (LongFirstBufferAggregator) longFirstAggFactory.factorizeBuffered(
colSelectorFactory);

ByteBuffer buffer = ByteBuffer.wrap(new byte[longFirstAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);

aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);

Pair<Long, Long> result = (Pair<Long, Long>) agg.get(buffer, 0);

Assert.assertEquals(times[3], result.lhs.longValue());
Assert.assertEquals(longValues[3], result.rhs.longValue());
Assert.assertEquals(longValues[3], agg.getLong(buffer, 0));
Assert.assertEquals(longValues[3], agg.getFloat(buffer, 0), 0.0001);
}

@Test
public void testCombine()
{
SerializablePair pair1 = new SerializablePair<>(1467225000L, 1263L);
SerializablePair pair2 = new SerializablePair<>(1467240000L, 752713L);
Assert.assertEquals(pair1, longFirstAggFactory.combine(pair1, pair2));
}

@Test
public void testLongFirstCombiningAggregator()
{
LongFirstAggregator agg = (LongFirstAggregator) combiningAggFactory.factorize(colSelectorFactory);

Assert.assertEquals("billy", agg.getName());

aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);

Pair<Long, Long> result = (Pair<Long, Long>) agg.get();
Pair<Long, Long> expected = (Pair<Long, Long>)pairs[0];

Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong());
Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001);

agg.reset();
Assert.assertEquals(0, ((Pair<Long, Long>) agg.get()).rhs, 0.0001);
}

@Test
public void testLongFirstCombiningBufferAggregator()
{
LongFirstBufferAggregator agg = (LongFirstBufferAggregator) combiningAggFactory.factorizeBuffered(
colSelectorFactory);

ByteBuffer buffer = ByteBuffer.wrap(new byte[longFirstAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);

aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);

Pair<Long, Long> result = (Pair<Long, Long>) agg.get(buffer, 0);
Pair<Long, Long> expected = (Pair<Long, Long>)pairs[0];

Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0));
Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001);
}


@Test
public void testSerde() throws Exception
{
DefaultObjectMapper mapper = new DefaultObjectMapper();
String longSpecJson = "{\"type\":\"longFirst\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
Assert.assertEquals(longFirstAggFactory, mapper.readValue(longSpecJson, AggregatorFactory.class));
}

private void aggregate(
LongFirstAggregator agg
)
{
agg.aggregate();
timeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}

private void aggregate(
LongFirstBufferAggregator agg,
ByteBuffer buff,
int position
)
{
agg.aggregate(buff, position);
timeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.last;

import io.druid.collections.SerializablePair;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Pair;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.TestFloatColumnSelector;
import io.druid.query.aggregation.TestLongColumnSelector;
import io.druid.query.aggregation.TestObjectColumnSelector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.column.Column;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.nio.ByteBuffer;

public class DoubleLastAggregationTest
{
private DoubleLastAggregatorFactory doubleLastAggFactory;
private DoubleLastAggregatorFactory combiningAggFactory;
private ColumnSelectorFactory colSelectorFactory;
private TestLongColumnSelector timeSelector;
private TestFloatColumnSelector valueSelector;
private TestObjectColumnSelector objectSelector;

private float[] floatValues = {1.1897f, 0.001f, 86.23f, 166.228f};
private long[] times = {8224, 6879, 2436, 7888};
private SerializablePair[] pairs = {
new SerializablePair<>(52782L, 134.3d),
new SerializablePair<>(65492L, 1232.212d),
new SerializablePair<>(69134L, 18.1233d),
new SerializablePair<>(11111L, 233.5232d)
};

@Before
public void setup()
{
doubleLastAggFactory = new DoubleLastAggregatorFactory("billy", "nilly");
combiningAggFactory = (DoubleLastAggregatorFactory) doubleLastAggFactory.getCombiningFactory();
timeSelector = new TestLongColumnSelector(times);
valueSelector = new TestFloatColumnSelector(floatValues);
objectSelector = new TestObjectColumnSelector(pairs);
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector);
EasyMock.expect(colSelectorFactory.makeFloatColumnSelector("nilly")).andReturn(valueSelector);
EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("billy")).andReturn(objectSelector);
EasyMock.replay(colSelectorFactory);
}

@Test
public void testDoubleLastAggregator()
{
DoubleLastAggregator agg = (DoubleLastAggregator) doubleLastAggFactory.factorize(colSelectorFactory);

Assert.assertEquals("billy", agg.getName());

aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);

Pair<Long, Double> result = (Pair<Long, Double>) agg.get();

Assert.assertEquals(times[0], result.lhs.longValue());
Assert.assertEquals(floatValues[0], result.rhs, 0.0001);
Assert.assertEquals((long) floatValues[0], agg.getLong());
Assert.assertEquals(floatValues[0], agg.getFloat(), 0.0001);

agg.reset();
Assert.assertEquals(0, ((Pair<Long, Double>) agg.get()).rhs, 0.0001);
}

@Test
public void testDoubleLastBufferAggregator()
{
DoubleLastBufferAggregator agg = (DoubleLastBufferAggregator) doubleLastAggFactory.factorizeBuffered(
colSelectorFactory);

ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleLastAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);

aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);

Pair<Long, Double> result = (Pair<Long, Double>) agg.get(buffer, 0);

Assert.assertEquals(times[0], result.lhs.longValue());
Assert.assertEquals(floatValues[0], result.rhs, 0.0001);
Assert.assertEquals((long) floatValues[0], agg.getLong(buffer, 0));
Assert.assertEquals(floatValues[0], agg.getFloat(buffer, 0), 0.0001);
}

@Test
public void testCombine()
{
SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621);
SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4);
Assert.assertEquals(pair2, doubleLastAggFactory.combine(pair1, pair2));
}

@Test
public void testDoubleLastCombiningAggregator()
{
DoubleLastAggregator agg = (DoubleLastAggregator) combiningAggFactory.factorize(colSelectorFactory);

Assert.assertEquals("billy", agg.getName());

aggregate(agg);
aggregate(agg);
aggregate(agg);
aggregate(agg);

Pair<Long, Double> result = (Pair<Long, Double>) agg.get();
Pair<Long, Double> expected = (Pair<Long, Double>)pairs[2];

Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs, 0.0001);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong());
Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001);

agg.reset();
Assert.assertEquals(0, ((Pair<Long, Double>) agg.get()).rhs, 0.0001);
}

@Test
public void testDoubleLastCombiningBufferAggregator()
{
DoubleLastBufferAggregator agg = (DoubleLastBufferAggregator) combiningAggFactory.factorizeBuffered(
colSelectorFactory);

ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleLastAggFactory.getMaxIntermediateSize()]);
agg.init(buffer, 0);

aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);
aggregate(agg, buffer, 0);

Pair<Long, Double> result = (Pair<Long, Double>) agg.get(buffer, 0);
Pair<Long, Double> expected = (Pair<Long, Double>)pairs[2];

Assert.assertEquals(expected.lhs, result.lhs);
Assert.assertEquals(expected.rhs, result.rhs, 0.0001);
Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0));
Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001);
}


@Test
public void testSerde() throws Exception
{
DefaultObjectMapper mapper = new DefaultObjectMapper();
String doubleSpecJson = "{\"type\":\"doubleLast\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
Assert.assertEquals(doubleLastAggFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class));
}

private void aggregate(
DoubleLastAggregator agg
)
{
agg.aggregate();
timeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}

private void aggregate(
DoubleLastBufferAggregator agg,
ByteBuffer buff,
int position
)
{
agg.aggregate(buff, position);
timeSelector.increment();
valueSelector.increment();
objectSelector.increment();
}
}
Loading