-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
First and Last Aggregator #3566
Merged
+3,389
−28
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
dbc1069
add first and last aggregator
acslk 759d902
add test and fix
acslk 4d96703
moving around
acslk 237b552
separate aggregator valueType
acslk 78bcfae
address PR comment
acslk 890c827
add finalize inner query and adjust v1 inner indexing
acslk 3c069a9
better test and fixes
acslk 481b526
java-util import fixes
jon-wei fa528cb
Merge remote-tracking branch 'upstream/master' into acslk-feature-fir…
jon-wei cc25586
PR comments
jon-wei 11e50b6
Add first/last aggs to ITWikipediaQueryTest
jon-wei File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
45 changes: 45 additions & 0 deletions
45
common/src/main/java/io/druid/collections/SerializablePair.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.collections; | ||
|
||
import com.fasterxml.jackson.annotation.JsonCreator; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import io.druid.java.util.common.Pair; | ||
|
||
public class SerializablePair<T1, T2> extends Pair<T1, T2> | ||
{ | ||
@JsonCreator | ||
public SerializablePair(@JsonProperty("lhs") T1 lhs, @JsonProperty("rhs") T2 rhs) | ||
{ | ||
super(lhs, rhs); | ||
} | ||
|
||
@JsonProperty | ||
public T1 getLhs() | ||
{ | ||
return lhs; | ||
} | ||
|
||
@JsonProperty | ||
public T2 getRhs() | ||
{ | ||
return rhs; | ||
} | ||
} |
51 changes: 51 additions & 0 deletions
51
common/src/test/java/io/druid/collections/SerializablePairTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.collections; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import org.junit.Assert; | ||
import org.junit.Test; | ||
|
||
import java.io.IOException; | ||
|
||
public class SerializablePairTest | ||
{ | ||
private static final ObjectMapper jsonMapper = new ObjectMapper(); | ||
|
||
@Test | ||
public void testBytesSerde() throws IOException | ||
{ | ||
SerializablePair pair = new SerializablePair<>(5L, 9L); | ||
byte[] bytes = jsonMapper.writeValueAsBytes(pair); | ||
SerializablePair<Number, Number> deserializedPair = jsonMapper.readValue(bytes, SerializablePair.class); | ||
Assert.assertEquals(pair.lhs, deserializedPair.lhs.longValue()); | ||
Assert.assertEquals(pair.rhs, deserializedPair.rhs.longValue()); | ||
} | ||
|
||
@Test | ||
public void testStringSerde() throws IOException | ||
{ | ||
SerializablePair pair = new SerializablePair<>(5L, 9L); | ||
String str = jsonMapper.writeValueAsString(pair); | ||
SerializablePair<Number, Number> deserializedPair = jsonMapper.readValue(str, SerializablePair.class); | ||
Assert.assertEquals(pair.lhs, deserializedPair.lhs.longValue()); | ||
Assert.assertEquals(pair.rhs, deserializedPair.rhs.longValue()); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -171,7 +171,7 @@ public List<AggregatorFactory> getRequiredColumns() | |
@Override | ||
public AggregatorFactory apply(String input) | ||
{ | ||
return new JavaScriptAggregatorFactory(input, fieldNames, fnAggregate, fnReset, fnCombine, config); | ||
return new JavaScriptAggregatorFactory(input, Lists.newArrayList(input), fnCombine, fnReset, fnCombine, config); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems unrelated to this PR ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's needed due to the changes within groupby in the PR |
||
} | ||
} | ||
) | ||
|
97 changes: 97 additions & 0 deletions
97
processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.first; | ||
|
||
import io.druid.collections.SerializablePair; | ||
import io.druid.query.aggregation.Aggregator; | ||
import io.druid.segment.FloatColumnSelector; | ||
import io.druid.segment.LongColumnSelector; | ||
|
||
public class DoubleFirstAggregator implements Aggregator | ||
{ | ||
|
||
private final FloatColumnSelector valueSelector; | ||
private final LongColumnSelector timeSelector; | ||
private final String name; | ||
|
||
protected long firstTime; | ||
protected double firstValue; | ||
|
||
public DoubleFirstAggregator( | ||
String name, | ||
LongColumnSelector timeSelector, | ||
FloatColumnSelector valueSelector | ||
) | ||
{ | ||
this.name = name; | ||
this.valueSelector = valueSelector; | ||
this.timeSelector = timeSelector; | ||
|
||
reset(); | ||
} | ||
|
||
@Override | ||
public void aggregate() | ||
{ | ||
long time = timeSelector.get(); | ||
if (time < firstTime) { | ||
firstTime = time; | ||
firstValue = valueSelector.get(); | ||
} | ||
} | ||
|
||
@Override | ||
public void reset() | ||
{ | ||
firstTime = Long.MAX_VALUE; | ||
firstValue = 0; | ||
} | ||
|
||
@Override | ||
public Object get() | ||
{ | ||
return new SerializablePair<>(firstTime, firstValue); | ||
} | ||
|
||
@Override | ||
public float getFloat() | ||
{ | ||
return (float) firstValue; | ||
} | ||
|
||
@Override | ||
public String getName() | ||
{ | ||
return name; | ||
} | ||
|
||
@Override | ||
public void close() | ||
{ | ||
|
||
} | ||
|
||
@Override | ||
public long getLong() | ||
{ | ||
return (long) firstValue; | ||
} | ||
} | ||
|
265 changes: 265 additions & 0 deletions
265
processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,265 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.first; | ||
|
||
import com.fasterxml.jackson.annotation.JsonCreator; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.primitives.Doubles; | ||
import com.google.common.primitives.Longs; | ||
import com.metamx.common.StringUtils; | ||
import io.druid.collections.SerializablePair; | ||
import io.druid.query.aggregation.Aggregator; | ||
import io.druid.query.aggregation.AggregatorFactory; | ||
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; | ||
import io.druid.query.aggregation.BufferAggregator; | ||
import io.druid.segment.ColumnSelectorFactory; | ||
import io.druid.segment.ObjectColumnSelector; | ||
import io.druid.segment.column.Column; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.util.Arrays; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
public class DoubleFirstAggregatorFactory extends AggregatorFactory | ||
{ | ||
public static final Comparator VALUE_COMPARATOR = new Comparator() | ||
{ | ||
@Override | ||
public int compare(Object o1, Object o2) | ||
{ | ||
return Doubles.compare(((SerializablePair<Long, Double>) o1).rhs, ((SerializablePair<Long, Double>) o2).rhs); | ||
} | ||
}; | ||
|
||
public static final Comparator TIME_COMPARATOR = new Comparator() | ||
{ | ||
@Override | ||
public int compare(Object o1, Object o2) | ||
{ | ||
return Longs.compare(((SerializablePair<Long, Object>) o1).lhs, ((SerializablePair<Long, Object>) o2).lhs); | ||
} | ||
}; | ||
|
||
private static final byte CACHE_TYPE_ID = 16; | ||
|
||
private final String fieldName; | ||
private final String name; | ||
|
||
@JsonCreator | ||
public DoubleFirstAggregatorFactory( | ||
@JsonProperty("name") String name, | ||
@JsonProperty("fieldName") final String fieldName | ||
) | ||
{ | ||
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); | ||
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); | ||
|
||
this.name = name; | ||
this.fieldName = fieldName; | ||
} | ||
|
||
@Override | ||
public Aggregator factorize(ColumnSelectorFactory metricFactory) | ||
{ | ||
return new DoubleFirstAggregator( | ||
name, | ||
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), | ||
metricFactory.makeFloatColumnSelector(fieldName) | ||
); | ||
} | ||
|
||
@Override | ||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) | ||
{ | ||
return new DoubleFirstBufferAggregator( | ||
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), | ||
metricFactory.makeFloatColumnSelector(fieldName) | ||
); | ||
} | ||
|
||
@Override | ||
public Comparator getComparator() | ||
{ | ||
return VALUE_COMPARATOR; | ||
} | ||
|
||
@Override | ||
public Object combine(Object lhs, Object rhs) | ||
{ | ||
return TIME_COMPARATOR.compare(lhs, rhs) <= 0 ? lhs : rhs; | ||
} | ||
|
||
@Override | ||
public AggregatorFactory getCombiningFactory() | ||
{ | ||
return new DoubleFirstAggregatorFactory(name, name) | ||
{ | ||
@Override | ||
public Aggregator factorize(ColumnSelectorFactory metricFactory) | ||
{ | ||
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); | ||
return new DoubleFirstAggregator(name, null, null) | ||
{ | ||
@Override | ||
public void aggregate() | ||
{ | ||
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.get(); | ||
if (pair.lhs < firstTime) { | ||
firstTime = pair.lhs; | ||
firstValue = pair.rhs; | ||
} | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) | ||
{ | ||
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); | ||
return new DoubleFirstBufferAggregator(null, null) | ||
{ | ||
@Override | ||
public void aggregate(ByteBuffer buf, int position) | ||
{ | ||
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.get(); | ||
long firstTime = buf.getLong(position); | ||
if (pair.lhs < firstTime) { | ||
buf.putLong(position, pair.lhs); | ||
buf.putDouble(position + Longs.BYTES, pair.rhs); | ||
} | ||
} | ||
}; | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException | ||
{ | ||
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { | ||
return getCombiningFactory(); | ||
} else { | ||
throw new AggregatorFactoryNotMergeableException(this, other); | ||
} | ||
} | ||
|
||
@Override | ||
public List<AggregatorFactory> getRequiredColumns() | ||
{ | ||
return Arrays.<AggregatorFactory>asList(new DoubleFirstAggregatorFactory(fieldName, fieldName)); | ||
} | ||
|
||
@Override | ||
public Object deserialize(Object object) | ||
{ | ||
Map map = (Map) object; | ||
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).doubleValue()); | ||
} | ||
|
||
@Override | ||
public Object finalizeComputation(Object object) | ||
{ | ||
return ((SerializablePair<Long, Double>) object).rhs; | ||
} | ||
|
||
@Override | ||
@JsonProperty | ||
public String getName() | ||
{ | ||
return name; | ||
} | ||
|
||
@JsonProperty | ||
public String getFieldName() | ||
{ | ||
return fieldName; | ||
} | ||
|
||
@Override | ||
public List<String> requiredFields() | ||
{ | ||
return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName); | ||
} | ||
|
||
@Override | ||
public byte[] getCacheKey() | ||
{ | ||
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); | ||
|
||
return ByteBuffer.allocate(2 + fieldNameBytes.length) | ||
.put(CACHE_TYPE_ID) | ||
.put(fieldNameBytes) | ||
.put((byte)0xff) | ||
.array(); | ||
} | ||
|
||
@Override | ||
public String getTypeName() | ||
{ | ||
return "float"; | ||
} | ||
|
||
@Override | ||
public int getMaxIntermediateSize() | ||
{ | ||
return Longs.BYTES + Doubles.BYTES; | ||
} | ||
|
||
@Override | ||
public Object getAggregatorStartValue() | ||
{ | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) | ||
{ | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
|
||
DoubleFirstAggregatorFactory that = (DoubleFirstAggregatorFactory) o; | ||
|
||
return fieldName.equals(that.fieldName) && name.equals(that.name); | ||
} | ||
|
||
@Override | ||
public int hashCode() | ||
{ | ||
int result = name.hashCode(); | ||
result = 31 * result + fieldName.hashCode(); | ||
return result; | ||
} | ||
|
||
@Override | ||
public String toString() | ||
{ | ||
return "DoubleFirstAggregatorFactory{" + | ||
"name='" + name + '\'' + | ||
", fieldName='" + fieldName + '\'' + | ||
'}'; | ||
} | ||
} |
82 changes: 82 additions & 0 deletions
82
processing/src/main/java/io/druid/query/aggregation/first/DoubleFirstBufferAggregator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.first; | ||
|
||
import com.google.common.primitives.Longs; | ||
import io.druid.collections.SerializablePair; | ||
import io.druid.query.aggregation.BufferAggregator; | ||
import io.druid.segment.FloatColumnSelector; | ||
import io.druid.segment.LongColumnSelector; | ||
|
||
import java.nio.ByteBuffer; | ||
|
||
public class DoubleFirstBufferAggregator implements BufferAggregator | ||
{ | ||
private final LongColumnSelector timeSelector; | ||
private final FloatColumnSelector valueSelector; | ||
|
||
public DoubleFirstBufferAggregator(LongColumnSelector timeSelector, FloatColumnSelector valueSelector) | ||
{ | ||
this.timeSelector = timeSelector; | ||
this.valueSelector = valueSelector; | ||
} | ||
|
||
@Override | ||
public void init(ByteBuffer buf, int position) | ||
{ | ||
buf.putLong(position, Long.MAX_VALUE); | ||
buf.putDouble(position + Longs.BYTES, 0); | ||
} | ||
|
||
@Override | ||
public void aggregate(ByteBuffer buf, int position) | ||
{ | ||
long time = timeSelector.get(); | ||
long firstTime = buf.getLong(position); | ||
if (time < firstTime) { | ||
buf.putLong(position, time); | ||
buf.putDouble(position + Longs.BYTES, valueSelector.get()); | ||
} | ||
} | ||
|
||
@Override | ||
public Object get(ByteBuffer buf, int position) | ||
{ | ||
return new SerializablePair<>(buf.getLong(position), buf.getDouble(position + Longs.BYTES)); | ||
} | ||
|
||
@Override | ||
public float getFloat(ByteBuffer buf, int position) | ||
{ | ||
return (float) buf.getDouble(position + Longs.BYTES); | ||
} | ||
|
||
@Override | ||
public long getLong(ByteBuffer buf, int position) | ||
{ | ||
return (long) buf.getDouble(position + Longs.BYTES); | ||
} | ||
|
||
@Override | ||
public void close() | ||
{ | ||
// no resources to cleanup | ||
} | ||
} |
95 changes: 95 additions & 0 deletions
95
processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.first; | ||
|
||
import io.druid.collections.SerializablePair; | ||
import io.druid.query.aggregation.Aggregator; | ||
import io.druid.segment.LongColumnSelector; | ||
|
||
public class LongFirstAggregator implements Aggregator | ||
{ | ||
|
||
private final LongColumnSelector valueSelector; | ||
private final LongColumnSelector timeSelector; | ||
private final String name; | ||
|
||
protected long firstTime; | ||
protected long firstValue; | ||
|
||
public LongFirstAggregator( | ||
String name, | ||
LongColumnSelector timeSelector, | ||
LongColumnSelector valueSelector | ||
) | ||
{ | ||
this.name = name; | ||
this.valueSelector = valueSelector; | ||
this.timeSelector = timeSelector; | ||
|
||
reset(); | ||
} | ||
|
||
@Override | ||
public void aggregate() | ||
{ | ||
long time = timeSelector.get(); | ||
if (time < firstTime) { | ||
firstTime = time; | ||
firstValue = valueSelector.get(); | ||
} | ||
} | ||
|
||
@Override | ||
public void reset() | ||
{ | ||
firstTime = Long.MAX_VALUE; | ||
firstValue = 0; | ||
} | ||
|
||
@Override | ||
public Object get() | ||
{ | ||
return new SerializablePair<>(firstTime, firstValue); | ||
} | ||
|
||
@Override | ||
public float getFloat() | ||
{ | ||
return (float) firstValue; | ||
} | ||
|
||
@Override | ||
public String getName() | ||
{ | ||
return name; | ||
} | ||
|
||
@Override | ||
public void close() | ||
{ | ||
|
||
} | ||
|
||
@Override | ||
public long getLong() | ||
{ | ||
return firstValue; | ||
} | ||
} |
255 changes: 255 additions & 0 deletions
255
processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,255 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.first; | ||
|
||
import com.fasterxml.jackson.annotation.JsonCreator; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.primitives.Longs; | ||
import com.metamx.common.StringUtils; | ||
import io.druid.collections.SerializablePair; | ||
import io.druid.query.aggregation.Aggregator; | ||
import io.druid.query.aggregation.AggregatorFactory; | ||
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; | ||
import io.druid.query.aggregation.BufferAggregator; | ||
import io.druid.segment.ColumnSelectorFactory; | ||
import io.druid.segment.ObjectColumnSelector; | ||
import io.druid.segment.column.Column; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.util.Arrays; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
public class LongFirstAggregatorFactory extends AggregatorFactory | ||
{ | ||
public static final Comparator VALUE_COMPARATOR = new Comparator() | ||
{ | ||
@Override | ||
public int compare(Object o1, Object o2) | ||
{ | ||
return Longs.compare(((SerializablePair<Long, Long>) o1).rhs, ((SerializablePair<Long, Long>) o2).rhs); | ||
} | ||
}; | ||
|
||
private static final byte CACHE_TYPE_ID = 17; | ||
|
||
private final String fieldName; | ||
private final String name; | ||
|
||
@JsonCreator | ||
public LongFirstAggregatorFactory( | ||
@JsonProperty("name") String name, | ||
@JsonProperty("fieldName") final String fieldName | ||
) | ||
{ | ||
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); | ||
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); | ||
|
||
this.name = name; | ||
this.fieldName = fieldName; | ||
} | ||
|
||
@Override | ||
public Aggregator factorize(ColumnSelectorFactory metricFactory) | ||
{ | ||
return new LongFirstAggregator( | ||
name, | ||
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), | ||
metricFactory.makeLongColumnSelector(fieldName) | ||
); | ||
} | ||
|
||
@Override | ||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) | ||
{ | ||
return new LongFirstBufferAggregator( | ||
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), | ||
metricFactory.makeLongColumnSelector(fieldName) | ||
); | ||
} | ||
|
||
@Override | ||
public Comparator getComparator() | ||
{ | ||
return VALUE_COMPARATOR; | ||
} | ||
|
||
@Override | ||
public Object combine(Object lhs, Object rhs) | ||
{ | ||
return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) <= 0 ? lhs : rhs; | ||
} | ||
|
||
@Override | ||
public AggregatorFactory getCombiningFactory() | ||
{ | ||
return new LongFirstAggregatorFactory(name, name) | ||
{ | ||
@Override | ||
public Aggregator factorize(ColumnSelectorFactory metricFactory) | ||
{ | ||
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); | ||
return new LongFirstAggregator(name, null, null) | ||
{ | ||
@Override | ||
public void aggregate() | ||
{ | ||
SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) selector.get(); | ||
if (pair.lhs < firstTime) { | ||
firstTime = pair.lhs; | ||
firstValue = pair.rhs; | ||
} | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) | ||
{ | ||
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); | ||
return new LongFirstBufferAggregator(null, null) | ||
{ | ||
@Override | ||
public void aggregate(ByteBuffer buf, int position) | ||
{ | ||
SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) selector.get(); | ||
long firstTime = buf.getLong(position); | ||
if (pair.lhs < firstTime) { | ||
buf.putLong(position, pair.lhs); | ||
buf.putLong(position + Longs.BYTES, pair.rhs); | ||
} | ||
} | ||
}; | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException | ||
{ | ||
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { | ||
return getCombiningFactory(); | ||
} else { | ||
throw new AggregatorFactoryNotMergeableException(this, other); | ||
} | ||
} | ||
|
||
@Override | ||
public List<AggregatorFactory> getRequiredColumns() | ||
{ | ||
return Arrays.<AggregatorFactory>asList(new LongFirstAggregatorFactory(fieldName, fieldName)); | ||
} | ||
|
||
@Override | ||
public Object deserialize(Object object) | ||
{ | ||
Map map = (Map) object; | ||
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).longValue()); | ||
} | ||
|
||
@Override | ||
public Object finalizeComputation(Object object) | ||
{ | ||
return ((SerializablePair<Long, Long>) object).rhs; | ||
} | ||
|
||
@Override | ||
@JsonProperty | ||
public String getName() | ||
{ | ||
return name; | ||
} | ||
|
||
@JsonProperty | ||
public String getFieldName() | ||
{ | ||
return fieldName; | ||
} | ||
|
||
@Override | ||
public List<String> requiredFields() | ||
{ | ||
return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName); | ||
} | ||
|
||
@Override | ||
public byte[] getCacheKey() | ||
{ | ||
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); | ||
|
||
return ByteBuffer.allocate(2 + fieldNameBytes.length) | ||
.put(CACHE_TYPE_ID) | ||
.put(fieldNameBytes) | ||
.put((byte)0xff) | ||
.array(); | ||
} | ||
|
||
@Override | ||
public String getTypeName() | ||
{ | ||
return "long"; | ||
} | ||
|
||
@Override | ||
public int getMaxIntermediateSize() | ||
{ | ||
return Longs.BYTES * 2; | ||
} | ||
|
||
@Override | ||
public Object getAggregatorStartValue() | ||
{ | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) | ||
{ | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
|
||
LongFirstAggregatorFactory that = (LongFirstAggregatorFactory) o; | ||
|
||
return fieldName.equals(that.fieldName) && name.equals(that.name); | ||
} | ||
|
||
@Override | ||
public int hashCode() | ||
{ | ||
int result = name.hashCode(); | ||
result = 31 * result + fieldName.hashCode(); | ||
return result; | ||
} | ||
|
||
@Override | ||
public String toString() | ||
{ | ||
return "LongFirstAggregatorFactory{" + | ||
"name='" + name + '\'' + | ||
", fieldName='" + fieldName + '\'' + | ||
'}'; | ||
} | ||
} |
81 changes: 81 additions & 0 deletions
81
processing/src/main/java/io/druid/query/aggregation/first/LongFirstBufferAggregator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.first; | ||
|
||
import com.google.common.primitives.Longs; | ||
import io.druid.collections.SerializablePair; | ||
import io.druid.query.aggregation.BufferAggregator; | ||
import io.druid.segment.LongColumnSelector; | ||
|
||
import java.nio.ByteBuffer; | ||
|
||
public class LongFirstBufferAggregator implements BufferAggregator | ||
{ | ||
private final LongColumnSelector timeSelector; | ||
private final LongColumnSelector valueSelector; | ||
|
||
public LongFirstBufferAggregator(LongColumnSelector timeSelector, LongColumnSelector valueSelector) | ||
{ | ||
this.timeSelector = timeSelector; | ||
this.valueSelector = valueSelector; | ||
} | ||
|
||
@Override | ||
public void init(ByteBuffer buf, int position) | ||
{ | ||
buf.putLong(position, Long.MAX_VALUE); | ||
buf.putLong(position + Longs.BYTES, 0); | ||
} | ||
|
||
@Override | ||
public void aggregate(ByteBuffer buf, int position) | ||
{ | ||
long time = timeSelector.get(); | ||
long firstTime = buf.getLong(position); | ||
if (time < firstTime) { | ||
buf.putLong(position, time); | ||
buf.putLong(position + Longs.BYTES, valueSelector.get()); | ||
} | ||
} | ||
|
||
@Override | ||
public Object get(ByteBuffer buf, int position) | ||
{ | ||
return new SerializablePair<>(buf.getLong(position), buf.getLong(position + Longs.BYTES)); | ||
} | ||
|
||
@Override | ||
public float getFloat(ByteBuffer buf, int position) | ||
{ | ||
return (float) buf.getLong(position + Longs.BYTES); | ||
} | ||
|
||
@Override | ||
public long getLong(ByteBuffer buf, int position) | ||
{ | ||
return buf.getLong(position + Longs.BYTES); | ||
} | ||
|
||
@Override | ||
public void close() | ||
{ | ||
// no resources to cleanup | ||
} | ||
} |
96 changes: 96 additions & 0 deletions
96
processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.last; | ||
|
||
import io.druid.collections.SerializablePair; | ||
import io.druid.query.aggregation.Aggregator; | ||
import io.druid.segment.FloatColumnSelector; | ||
import io.druid.segment.LongColumnSelector; | ||
|
||
public class DoubleLastAggregator implements Aggregator | ||
{ | ||
|
||
private final FloatColumnSelector valueSelector; | ||
private final LongColumnSelector timeSelector; | ||
private final String name; | ||
|
||
protected long lastTime; | ||
protected double lastValue; | ||
|
||
public DoubleLastAggregator( | ||
String name, | ||
LongColumnSelector timeSelector, | ||
FloatColumnSelector valueSelector | ||
) | ||
{ | ||
this.name = name; | ||
this.valueSelector = valueSelector; | ||
this.timeSelector = timeSelector; | ||
|
||
reset(); | ||
} | ||
|
||
@Override | ||
public void aggregate() | ||
{ | ||
long time = timeSelector.get(); | ||
if (time >= lastTime) { | ||
lastTime = timeSelector.get(); | ||
lastValue = valueSelector.get(); | ||
} | ||
} | ||
|
||
@Override | ||
public void reset() | ||
{ | ||
lastTime = Long.MIN_VALUE; | ||
lastValue = 0; | ||
} | ||
|
||
@Override | ||
public Object get() | ||
{ | ||
return new SerializablePair<>(lastTime, lastValue); | ||
} | ||
|
||
@Override | ||
public float getFloat() | ||
{ | ||
return (float) lastValue; | ||
} | ||
|
||
@Override | ||
public String getName() | ||
{ | ||
return name; | ||
} | ||
|
||
@Override | ||
public void close() | ||
{ | ||
|
||
} | ||
|
||
@Override | ||
public long getLong() | ||
{ | ||
return (long) lastValue; | ||
} | ||
} |
248 changes: 248 additions & 0 deletions
248
processing/src/main/java/io/druid/query/aggregation/last/DoubleLastAggregatorFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,248 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.last; | ||
|
||
import com.fasterxml.jackson.annotation.JsonCreator; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.primitives.Doubles; | ||
import com.google.common.primitives.Longs; | ||
import com.metamx.common.StringUtils; | ||
import io.druid.collections.SerializablePair; | ||
import io.druid.query.aggregation.Aggregator; | ||
import io.druid.query.aggregation.AggregatorFactory; | ||
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; | ||
import io.druid.query.aggregation.BufferAggregator; | ||
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory; | ||
import io.druid.query.aggregation.first.LongFirstAggregatorFactory; | ||
import io.druid.segment.ColumnSelectorFactory; | ||
import io.druid.segment.ObjectColumnSelector; | ||
import io.druid.segment.column.Column; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.util.Arrays; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
public class DoubleLastAggregatorFactory extends AggregatorFactory | ||
{ | ||
private static final byte CACHE_TYPE_ID = 18; | ||
|
||
private final String fieldName; | ||
private final String name; | ||
|
||
@JsonCreator | ||
public DoubleLastAggregatorFactory( | ||
@JsonProperty("name") String name, | ||
@JsonProperty("fieldName") final String fieldName | ||
) | ||
{ | ||
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); | ||
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); | ||
this.name = name; | ||
this.fieldName = fieldName; | ||
} | ||
|
||
@Override | ||
public Aggregator factorize(ColumnSelectorFactory metricFactory) | ||
{ | ||
return new DoubleLastAggregator( | ||
name, | ||
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), | ||
metricFactory.makeFloatColumnSelector(fieldName) | ||
); | ||
} | ||
|
||
@Override | ||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) | ||
{ | ||
return new DoubleLastBufferAggregator( | ||
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), | ||
metricFactory.makeFloatColumnSelector(fieldName) | ||
); | ||
} | ||
|
||
@Override | ||
public Comparator getComparator() | ||
{ | ||
return DoubleFirstAggregatorFactory.VALUE_COMPARATOR; | ||
} | ||
|
||
@Override | ||
public Object combine(Object lhs, Object rhs) | ||
{ | ||
return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs; | ||
} | ||
|
||
@Override | ||
public AggregatorFactory getCombiningFactory() | ||
{ | ||
return new DoubleLastAggregatorFactory(name, name) | ||
{ | ||
@Override | ||
public Aggregator factorize(ColumnSelectorFactory metricFactory) | ||
{ | ||
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); | ||
return new DoubleLastAggregator(name, null, null) | ||
{ | ||
@Override | ||
public void aggregate() | ||
{ | ||
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.get(); | ||
if (pair.lhs >= lastTime) { | ||
lastTime = pair.lhs; | ||
lastValue = pair.rhs; | ||
} | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) | ||
{ | ||
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); | ||
return new DoubleLastBufferAggregator(null, null) | ||
{ | ||
@Override | ||
public void aggregate(ByteBuffer buf, int position) | ||
{ | ||
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.get(); | ||
long lastTime = buf.getLong(position); | ||
if (pair.lhs >= lastTime) { | ||
buf.putLong(position, pair.lhs); | ||
buf.putDouble(position + Longs.BYTES, pair.rhs); | ||
} | ||
} | ||
}; | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException | ||
{ | ||
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { | ||
return getCombiningFactory(); | ||
} else { | ||
throw new AggregatorFactoryNotMergeableException(this, other); | ||
} | ||
} | ||
|
||
@Override | ||
public List<AggregatorFactory> getRequiredColumns() | ||
{ | ||
return Arrays.<AggregatorFactory>asList(new LongFirstAggregatorFactory(fieldName, fieldName)); | ||
} | ||
|
||
@Override | ||
public Object deserialize(Object object) | ||
{ | ||
Map map = (Map) object; | ||
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).doubleValue()); | ||
} | ||
|
||
@Override | ||
public Object finalizeComputation(Object object) | ||
{ | ||
return ((SerializablePair<Long, Double>) object).rhs; | ||
} | ||
|
||
@Override | ||
@JsonProperty | ||
public String getName() | ||
{ | ||
return name; | ||
} | ||
|
||
@JsonProperty | ||
public String getFieldName() | ||
{ | ||
return fieldName; | ||
} | ||
|
||
@Override | ||
public List<String> requiredFields() | ||
{ | ||
return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName); | ||
} | ||
|
||
@Override | ||
public byte[] getCacheKey() | ||
{ | ||
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); | ||
|
||
return ByteBuffer.allocate(2 + fieldNameBytes.length) | ||
.put(CACHE_TYPE_ID) | ||
.put(fieldNameBytes) | ||
.put((byte)0xff) | ||
.array(); | ||
} | ||
|
||
@Override | ||
public String getTypeName() | ||
{ | ||
return "float"; | ||
} | ||
|
||
@Override | ||
public int getMaxIntermediateSize() | ||
{ | ||
return Longs.BYTES + Doubles.BYTES; | ||
} | ||
|
||
@Override | ||
public Object getAggregatorStartValue() | ||
{ | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) | ||
{ | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
|
||
DoubleLastAggregatorFactory that = (DoubleLastAggregatorFactory) o; | ||
|
||
return fieldName.equals(that.fieldName) && name.equals(that.name); | ||
} | ||
|
||
@Override | ||
public int hashCode() | ||
{ | ||
int result = name.hashCode(); | ||
result = 31 * result + fieldName.hashCode(); | ||
return result; | ||
} | ||
|
||
@Override | ||
public String toString() | ||
{ | ||
return "DoubleLastAggregatorFactory{" + | ||
"name='" + name + '\'' + | ||
", fieldName='" + fieldName + '\'' + | ||
'}'; | ||
} | ||
} |
82 changes: 82 additions & 0 deletions
82
processing/src/main/java/io/druid/query/aggregation/last/DoubleLastBufferAggregator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.last; | ||
|
||
import com.google.common.primitives.Longs; | ||
import io.druid.collections.SerializablePair; | ||
import io.druid.query.aggregation.BufferAggregator; | ||
import io.druid.segment.FloatColumnSelector; | ||
import io.druid.segment.LongColumnSelector; | ||
|
||
import java.nio.ByteBuffer; | ||
|
||
public class DoubleLastBufferAggregator implements BufferAggregator | ||
{ | ||
private final LongColumnSelector timeSelector; | ||
private final FloatColumnSelector valueSelector; | ||
|
||
public DoubleLastBufferAggregator(LongColumnSelector timeSelector, FloatColumnSelector valueSelector) | ||
{ | ||
this.timeSelector = timeSelector; | ||
this.valueSelector = valueSelector; | ||
} | ||
|
||
@Override | ||
public void init(ByteBuffer buf, int position) | ||
{ | ||
buf.putLong(position, Long.MIN_VALUE); | ||
buf.putDouble(position + Longs.BYTES, 0); | ||
} | ||
|
||
@Override | ||
public void aggregate(ByteBuffer buf, int position) | ||
{ | ||
long time = timeSelector.get(); | ||
long lastTime = buf.getLong(position); | ||
if (time >= lastTime) { | ||
buf.putLong(position, time); | ||
buf.putDouble(position + Longs.BYTES, valueSelector.get()); | ||
} | ||
} | ||
|
||
@Override | ||
public Object get(ByteBuffer buf, int position) | ||
{ | ||
return new SerializablePair<>(buf.getLong(position), buf.getDouble(position + Longs.BYTES)); | ||
} | ||
|
||
@Override | ||
public float getFloat(ByteBuffer buf, int position) | ||
{ | ||
return (float) buf.getDouble(position + Longs.BYTES); | ||
} | ||
|
||
@Override | ||
public long getLong(ByteBuffer buf, int position) | ||
{ | ||
return (long) buf.getDouble(position + Longs.BYTES); | ||
} | ||
|
||
@Override | ||
public void close() | ||
{ | ||
// no resources to cleanup | ||
} | ||
} |
94 changes: 94 additions & 0 deletions
94
processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.last; | ||
|
||
import io.druid.collections.SerializablePair; | ||
import io.druid.query.aggregation.Aggregator; | ||
import io.druid.segment.LongColumnSelector; | ||
|
||
public class LongLastAggregator implements Aggregator | ||
{ | ||
private final LongColumnSelector valueSelector; | ||
private final LongColumnSelector timeSelector; | ||
private final String name; | ||
|
||
protected long lastTime; | ||
protected long lastValue; | ||
|
||
public LongLastAggregator( | ||
String name, | ||
LongColumnSelector timeSelector, | ||
LongColumnSelector valueSelector | ||
) | ||
{ | ||
this.name = name; | ||
this.valueSelector = valueSelector; | ||
this.timeSelector = timeSelector; | ||
|
||
reset(); | ||
} | ||
|
||
@Override | ||
public void aggregate() | ||
{ | ||
long time = timeSelector.get(); | ||
if (time >= lastTime) { | ||
lastTime = timeSelector.get(); | ||
lastValue = valueSelector.get(); | ||
} | ||
} | ||
|
||
@Override | ||
public void reset() | ||
{ | ||
lastTime = Long.MIN_VALUE; | ||
lastValue = 0; | ||
} | ||
|
||
@Override | ||
public Object get() | ||
{ | ||
return new SerializablePair<>(lastTime, lastValue); | ||
} | ||
|
||
@Override | ||
public float getFloat() | ||
{ | ||
return (float) lastValue; | ||
} | ||
|
||
@Override | ||
public String getName() | ||
{ | ||
return name; | ||
} | ||
|
||
@Override | ||
public void close() | ||
{ | ||
|
||
} | ||
|
||
@Override | ||
public long getLong() | ||
{ | ||
return lastValue; | ||
} | ||
} |
248 changes: 248 additions & 0 deletions
248
processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,248 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.last; | ||
|
||
import com.fasterxml.jackson.annotation.JsonCreator; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.primitives.Longs; | ||
import com.metamx.common.StringUtils; | ||
import io.druid.collections.SerializablePair; | ||
import io.druid.query.aggregation.Aggregator; | ||
import io.druid.query.aggregation.AggregatorFactory; | ||
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; | ||
import io.druid.query.aggregation.BufferAggregator; | ||
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory; | ||
import io.druid.query.aggregation.first.LongFirstAggregatorFactory; | ||
import io.druid.segment.ColumnSelectorFactory; | ||
import io.druid.segment.ObjectColumnSelector; | ||
import io.druid.segment.column.Column; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.util.Arrays; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
public class LongLastAggregatorFactory extends AggregatorFactory | ||
{ | ||
private static final byte CACHE_TYPE_ID = 19; | ||
|
||
private final String fieldName; | ||
private final String name; | ||
|
||
@JsonCreator | ||
public LongLastAggregatorFactory( | ||
@JsonProperty("name") String name, | ||
@JsonProperty("fieldName") final String fieldName | ||
) | ||
{ | ||
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); | ||
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); | ||
this.name = name; | ||
this.fieldName = fieldName; | ||
} | ||
|
||
@Override | ||
public Aggregator factorize(ColumnSelectorFactory metricFactory) | ||
{ | ||
return new LongLastAggregator( | ||
name, | ||
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), | ||
metricFactory.makeLongColumnSelector(fieldName) | ||
); | ||
} | ||
|
||
@Override | ||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) | ||
{ | ||
return new LongLastBufferAggregator( | ||
metricFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME), | ||
metricFactory.makeLongColumnSelector(fieldName) | ||
); | ||
} | ||
|
||
@Override | ||
public Comparator getComparator() | ||
{ | ||
return LongFirstAggregatorFactory.VALUE_COMPARATOR; | ||
} | ||
|
||
@Override | ||
public Object combine(Object lhs, Object rhs) | ||
{ | ||
return DoubleFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs; | ||
} | ||
|
||
|
||
@Override | ||
public AggregatorFactory getCombiningFactory() | ||
{ | ||
return new LongLastAggregatorFactory(name, name) | ||
{ | ||
@Override | ||
public Aggregator factorize(ColumnSelectorFactory metricFactory) | ||
{ | ||
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); | ||
return new LongLastAggregator(name, null, null) | ||
{ | ||
@Override | ||
public void aggregate() | ||
{ | ||
SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) selector.get(); | ||
if (pair.lhs >= lastTime) { | ||
lastTime = pair.lhs; | ||
lastValue = pair.rhs; | ||
} | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) | ||
{ | ||
final ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(name); | ||
return new LongLastBufferAggregator(null, null) | ||
{ | ||
@Override | ||
public void aggregate(ByteBuffer buf, int position) | ||
{ | ||
SerializablePair<Long, Long> pair = (SerializablePair<Long, Long>) selector.get(); | ||
long lastTime = buf.getLong(position); | ||
if (pair.lhs >= lastTime) { | ||
buf.putLong(position, pair.lhs); | ||
buf.putLong(position + Longs.BYTES, pair.rhs); | ||
} | ||
} | ||
}; | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException | ||
{ | ||
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { | ||
return getCombiningFactory(); | ||
} else { | ||
throw new AggregatorFactoryNotMergeableException(this, other); | ||
} | ||
} | ||
|
||
@Override | ||
public List<AggregatorFactory> getRequiredColumns() | ||
{ | ||
return Arrays.<AggregatorFactory>asList(new LongLastAggregatorFactory(fieldName, fieldName)); | ||
} | ||
|
||
@Override | ||
public Object deserialize(Object object) | ||
{ | ||
Map map = (Map) object; | ||
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).longValue()); | ||
} | ||
|
||
@Override | ||
public Object finalizeComputation(Object object) | ||
{ | ||
return ((SerializablePair<Long, Long>) object).rhs; | ||
} | ||
|
||
@Override | ||
@JsonProperty | ||
public String getName() | ||
{ | ||
return name; | ||
} | ||
|
||
@JsonProperty | ||
public String getFieldName() | ||
{ | ||
return fieldName; | ||
} | ||
|
||
@Override | ||
public List<String> requiredFields() | ||
{ | ||
return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName); | ||
} | ||
|
||
@Override | ||
public byte[] getCacheKey() | ||
{ | ||
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); | ||
|
||
return ByteBuffer.allocate(2 + fieldNameBytes.length) | ||
.put(CACHE_TYPE_ID) | ||
.put(fieldNameBytes) | ||
.put((byte)0xff) | ||
.array(); | ||
} | ||
|
||
@Override | ||
public String getTypeName() | ||
{ | ||
return "long"; | ||
} | ||
|
||
@Override | ||
public int getMaxIntermediateSize() | ||
{ | ||
return Longs.BYTES * 2; | ||
} | ||
|
||
@Override | ||
public Object getAggregatorStartValue() | ||
{ | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) | ||
{ | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
|
||
LongLastAggregatorFactory that = (LongLastAggregatorFactory) o; | ||
|
||
return name.equals(that.name) && fieldName.equals(that.fieldName); | ||
} | ||
|
||
@Override | ||
public int hashCode() | ||
{ | ||
int result = name.hashCode(); | ||
result = 31 * result + fieldName.hashCode(); | ||
return result; | ||
} | ||
|
||
@Override | ||
public String toString() | ||
{ | ||
return "LongLastAggregatorFactory{" + | ||
"name='" + name + '\'' + | ||
", fieldName='" + fieldName + '\'' + | ||
'}'; | ||
} | ||
} |
81 changes: 81 additions & 0 deletions
81
processing/src/main/java/io/druid/query/aggregation/last/LongLastBufferAggregator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.last; | ||
|
||
import com.google.common.primitives.Longs; | ||
import io.druid.collections.SerializablePair; | ||
import io.druid.query.aggregation.BufferAggregator; | ||
import io.druid.segment.LongColumnSelector; | ||
|
||
import java.nio.ByteBuffer; | ||
|
||
public class LongLastBufferAggregator implements BufferAggregator | ||
{ | ||
private final LongColumnSelector timeSelector; | ||
private final LongColumnSelector valueSelector; | ||
|
||
public LongLastBufferAggregator(LongColumnSelector timeSelector, LongColumnSelector valueSelector) | ||
{ | ||
this.timeSelector = timeSelector; | ||
this.valueSelector = valueSelector; | ||
} | ||
|
||
@Override | ||
public void init(ByteBuffer buf, int position) | ||
{ | ||
buf.putLong(position, Long.MIN_VALUE); | ||
buf.putLong(position + Longs.BYTES, 0); | ||
} | ||
|
||
@Override | ||
public void aggregate(ByteBuffer buf, int position) | ||
{ | ||
long time = timeSelector.get(); | ||
long lastTime = buf.getLong(position); | ||
if (time >= lastTime) { | ||
buf.putLong(position, time); | ||
buf.putLong(position + Longs.BYTES, valueSelector.get()); | ||
} | ||
} | ||
|
||
@Override | ||
public Object get(ByteBuffer buf, int position) | ||
{ | ||
return new SerializablePair<>(buf.getLong(position), buf.getLong(position + Longs.BYTES)); | ||
} | ||
|
||
@Override | ||
public float getFloat(ByteBuffer buf, int position) | ||
{ | ||
return (float) buf.getLong(position + Longs.BYTES); | ||
} | ||
|
||
@Override | ||
public long getLong(ByteBuffer buf, int position) | ||
{ | ||
return buf.getLong(position + Longs.BYTES); | ||
} | ||
|
||
@Override | ||
public void close() | ||
{ | ||
// no resources to cleanup | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
201 changes: 201 additions & 0 deletions
201
processing/src/test/java/io/druid/query/aggregation/first/DoubleFirstAggregationTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,201 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.first; | ||
|
||
import io.druid.collections.SerializablePair; | ||
import io.druid.jackson.DefaultObjectMapper; | ||
import io.druid.java.util.common.Pair; | ||
import io.druid.query.aggregation.AggregatorFactory; | ||
import io.druid.query.aggregation.TestFloatColumnSelector; | ||
import io.druid.query.aggregation.TestLongColumnSelector; | ||
import io.druid.query.aggregation.TestObjectColumnSelector; | ||
import io.druid.segment.ColumnSelectorFactory; | ||
import io.druid.segment.column.Column; | ||
import org.easymock.EasyMock; | ||
import org.junit.Assert; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
|
||
import java.nio.ByteBuffer; | ||
|
||
public class DoubleFirstAggregationTest | ||
{ | ||
private DoubleFirstAggregatorFactory doubleFirstAggFactory; | ||
private DoubleFirstAggregatorFactory combiningAggFactory; | ||
private ColumnSelectorFactory colSelectorFactory; | ||
private TestLongColumnSelector timeSelector; | ||
private TestFloatColumnSelector valueSelector; | ||
private TestObjectColumnSelector objectSelector; | ||
|
||
private float[] floatValues = {1.1f, 2.7f, 3.5f, 1.3f}; | ||
private long[] times = {12, 10, 5344, 7899999}; | ||
private SerializablePair[] pairs = { | ||
new SerializablePair<>(1467225096L, 134.3d), | ||
new SerializablePair<>(23163L, 1232.212d), | ||
new SerializablePair<>(742L, 18d), | ||
new SerializablePair<>(111111L, 233.5232d) | ||
}; | ||
|
||
@Before | ||
public void setup() | ||
{ | ||
doubleFirstAggFactory = new DoubleFirstAggregatorFactory("billy", "nilly"); | ||
combiningAggFactory = (DoubleFirstAggregatorFactory) doubleFirstAggFactory.getCombiningFactory(); | ||
timeSelector = new TestLongColumnSelector(times); | ||
valueSelector = new TestFloatColumnSelector(floatValues); | ||
objectSelector = new TestObjectColumnSelector(pairs); | ||
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); | ||
EasyMock.expect(colSelectorFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector); | ||
EasyMock.expect(colSelectorFactory.makeFloatColumnSelector("nilly")).andReturn(valueSelector); | ||
EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("billy")).andReturn(objectSelector); | ||
EasyMock.replay(colSelectorFactory); | ||
} | ||
|
||
@Test | ||
public void testDoubleFirstAggregator() | ||
{ | ||
DoubleFirstAggregator agg = (DoubleFirstAggregator) doubleFirstAggFactory.factorize(colSelectorFactory); | ||
|
||
Assert.assertEquals("billy", agg.getName()); | ||
|
||
aggregate(agg); | ||
aggregate(agg); | ||
aggregate(agg); | ||
aggregate(agg); | ||
|
||
Pair<Long, Double> result = (Pair<Long, Double>) agg.get(); | ||
|
||
Assert.assertEquals(times[1], result.lhs.longValue()); | ||
Assert.assertEquals(floatValues[1], result.rhs, 0.0001); | ||
Assert.assertEquals((long) floatValues[1], agg.getLong()); | ||
Assert.assertEquals(floatValues[1], agg.getFloat(), 0.0001); | ||
|
||
agg.reset(); | ||
Assert.assertEquals(0, ((Pair<Long, Double>) agg.get()).rhs, 0.0001); | ||
} | ||
|
||
@Test | ||
public void testDoubleFirstBufferAggregator() | ||
{ | ||
DoubleFirstBufferAggregator agg = (DoubleFirstBufferAggregator) doubleFirstAggFactory.factorizeBuffered( | ||
colSelectorFactory); | ||
|
||
ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleFirstAggFactory.getMaxIntermediateSize()]); | ||
agg.init(buffer, 0); | ||
|
||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
|
||
Pair<Long, Double> result = (Pair<Long, Double>) agg.get(buffer, 0); | ||
|
||
Assert.assertEquals(times[1], result.lhs.longValue()); | ||
Assert.assertEquals(floatValues[1], result.rhs, 0.0001); | ||
Assert.assertEquals((long) floatValues[1], agg.getLong(buffer, 0)); | ||
Assert.assertEquals(floatValues[1], agg.getFloat(buffer, 0), 0.0001); | ||
} | ||
|
||
@Test | ||
public void testCombine() | ||
{ | ||
SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621); | ||
SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4); | ||
Assert.assertEquals(pair1, doubleFirstAggFactory.combine(pair1, pair2)); | ||
} | ||
|
||
@Test | ||
public void testDoubleFirstCombiningAggregator() | ||
{ | ||
DoubleFirstAggregator agg = (DoubleFirstAggregator) combiningAggFactory.factorize(colSelectorFactory); | ||
|
||
Assert.assertEquals("billy", agg.getName()); | ||
|
||
aggregate(agg); | ||
aggregate(agg); | ||
aggregate(agg); | ||
aggregate(agg); | ||
|
||
Pair<Long, Double> result = (Pair<Long, Double>) agg.get(); | ||
Pair<Long, Double> expected = (Pair<Long, Double>)pairs[2]; | ||
|
||
Assert.assertEquals(expected.lhs, result.lhs); | ||
Assert.assertEquals(expected.rhs, result.rhs, 0.0001); | ||
Assert.assertEquals(expected.rhs.longValue(), agg.getLong()); | ||
Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001); | ||
|
||
agg.reset(); | ||
Assert.assertEquals(0, ((Pair<Long, Double>) agg.get()).rhs, 0.0001); | ||
} | ||
|
||
@Test | ||
public void testDoubleFirstCombiningBufferAggregator() | ||
{ | ||
DoubleFirstBufferAggregator agg = (DoubleFirstBufferAggregator) combiningAggFactory.factorizeBuffered( | ||
colSelectorFactory); | ||
|
||
ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleFirstAggFactory.getMaxIntermediateSize()]); | ||
agg.init(buffer, 0); | ||
|
||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
|
||
Pair<Long, Double> result = (Pair<Long, Double>) agg.get(buffer, 0); | ||
Pair<Long, Double> expected = (Pair<Long, Double>)pairs[2]; | ||
|
||
Assert.assertEquals(expected.lhs, result.lhs); | ||
Assert.assertEquals(expected.rhs, result.rhs, 0.0001); | ||
Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0)); | ||
Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001); | ||
} | ||
|
||
|
||
@Test | ||
public void testSerde() throws Exception | ||
{ | ||
DefaultObjectMapper mapper = new DefaultObjectMapper(); | ||
String doubleSpecJson = "{\"type\":\"doubleFirst\",\"name\":\"billy\",\"fieldName\":\"nilly\"}"; | ||
Assert.assertEquals(doubleFirstAggFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class)); | ||
} | ||
|
||
private void aggregate( | ||
DoubleFirstAggregator agg | ||
) | ||
{ | ||
agg.aggregate(); | ||
timeSelector.increment(); | ||
valueSelector.increment(); | ||
objectSelector.increment(); | ||
} | ||
|
||
private void aggregate( | ||
DoubleFirstBufferAggregator agg, | ||
ByteBuffer buff, | ||
int position | ||
) | ||
{ | ||
agg.aggregate(buff, position); | ||
timeSelector.increment(); | ||
valueSelector.increment(); | ||
objectSelector.increment(); | ||
} | ||
} |
200 changes: 200 additions & 0 deletions
200
processing/src/test/java/io/druid/query/aggregation/first/LongFirstAggregationTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.first; | ||
|
||
import io.druid.collections.SerializablePair; | ||
import io.druid.jackson.DefaultObjectMapper; | ||
import io.druid.java.util.common.Pair; | ||
import io.druid.query.aggregation.AggregatorFactory; | ||
import io.druid.query.aggregation.TestLongColumnSelector; | ||
import io.druid.query.aggregation.TestObjectColumnSelector; | ||
import io.druid.segment.ColumnSelectorFactory; | ||
import io.druid.segment.column.Column; | ||
import org.easymock.EasyMock; | ||
import org.junit.Assert; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
|
||
import java.nio.ByteBuffer; | ||
|
||
public class LongFirstAggregationTest | ||
{ | ||
private LongFirstAggregatorFactory longFirstAggFactory; | ||
private LongFirstAggregatorFactory combiningAggFactory; | ||
private ColumnSelectorFactory colSelectorFactory; | ||
private TestLongColumnSelector timeSelector; | ||
private TestLongColumnSelector valueSelector; | ||
private TestObjectColumnSelector objectSelector; | ||
|
||
private long[] longValues = {185, -216, -128751132, Long.MIN_VALUE}; | ||
private long[] times = {1123126751, 1784247991, 1854329816, 1000000000}; | ||
private SerializablePair[] pairs = { | ||
new SerializablePair<>(1L, 113267L), | ||
new SerializablePair<>(1L, 5437384L), | ||
new SerializablePair<>(6L, 34583458L), | ||
new SerializablePair<>(88L, 34583452L) | ||
}; | ||
|
||
@Before | ||
public void setup() | ||
{ | ||
longFirstAggFactory = new LongFirstAggregatorFactory("billy", "nilly"); | ||
combiningAggFactory = (LongFirstAggregatorFactory) longFirstAggFactory.getCombiningFactory(); | ||
timeSelector = new TestLongColumnSelector(times); | ||
valueSelector = new TestLongColumnSelector(longValues); | ||
objectSelector = new TestObjectColumnSelector(pairs); | ||
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); | ||
EasyMock.expect(colSelectorFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector); | ||
EasyMock.expect(colSelectorFactory.makeLongColumnSelector("nilly")).andReturn(valueSelector); | ||
EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("billy")).andReturn(objectSelector); | ||
EasyMock.replay(colSelectorFactory); | ||
} | ||
|
||
@Test | ||
public void testLongFirstAggregator() | ||
{ | ||
LongFirstAggregator agg = (LongFirstAggregator) longFirstAggFactory.factorize(colSelectorFactory); | ||
|
||
Assert.assertEquals("billy", agg.getName()); | ||
|
||
aggregate(agg); | ||
aggregate(agg); | ||
aggregate(agg); | ||
aggregate(agg); | ||
|
||
Pair<Long, Long> result = (Pair<Long, Long>) agg.get(); | ||
|
||
Assert.assertEquals(times[3], result.lhs.longValue()); | ||
Assert.assertEquals(longValues[3], result.rhs.longValue()); | ||
Assert.assertEquals(longValues[3], agg.getLong()); | ||
Assert.assertEquals(longValues[3], agg.getFloat(), 0.0001); | ||
|
||
agg.reset(); | ||
Assert.assertEquals(0, ((Pair<Long, Long>) agg.get()).rhs, 0.0001); | ||
} | ||
|
||
@Test | ||
public void testLongFirstBufferAggregator() | ||
{ | ||
LongFirstBufferAggregator agg = (LongFirstBufferAggregator) longFirstAggFactory.factorizeBuffered( | ||
colSelectorFactory); | ||
|
||
ByteBuffer buffer = ByteBuffer.wrap(new byte[longFirstAggFactory.getMaxIntermediateSize()]); | ||
agg.init(buffer, 0); | ||
|
||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
|
||
Pair<Long, Long> result = (Pair<Long, Long>) agg.get(buffer, 0); | ||
|
||
Assert.assertEquals(times[3], result.lhs.longValue()); | ||
Assert.assertEquals(longValues[3], result.rhs.longValue()); | ||
Assert.assertEquals(longValues[3], agg.getLong(buffer, 0)); | ||
Assert.assertEquals(longValues[3], agg.getFloat(buffer, 0), 0.0001); | ||
} | ||
|
||
@Test | ||
public void testCombine() | ||
{ | ||
SerializablePair pair1 = new SerializablePair<>(1467225000L, 1263L); | ||
SerializablePair pair2 = new SerializablePair<>(1467240000L, 752713L); | ||
Assert.assertEquals(pair1, longFirstAggFactory.combine(pair1, pair2)); | ||
} | ||
|
||
@Test | ||
public void testLongFirstCombiningAggregator() | ||
{ | ||
LongFirstAggregator agg = (LongFirstAggregator) combiningAggFactory.factorize(colSelectorFactory); | ||
|
||
Assert.assertEquals("billy", agg.getName()); | ||
|
||
aggregate(agg); | ||
aggregate(agg); | ||
aggregate(agg); | ||
aggregate(agg); | ||
|
||
Pair<Long, Long> result = (Pair<Long, Long>) agg.get(); | ||
Pair<Long, Long> expected = (Pair<Long, Long>)pairs[0]; | ||
|
||
Assert.assertEquals(expected.lhs, result.lhs); | ||
Assert.assertEquals(expected.rhs, result.rhs); | ||
Assert.assertEquals(expected.rhs.longValue(), agg.getLong()); | ||
Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001); | ||
|
||
agg.reset(); | ||
Assert.assertEquals(0, ((Pair<Long, Long>) agg.get()).rhs, 0.0001); | ||
} | ||
|
||
@Test | ||
public void testLongFirstCombiningBufferAggregator() | ||
{ | ||
LongFirstBufferAggregator agg = (LongFirstBufferAggregator) combiningAggFactory.factorizeBuffered( | ||
colSelectorFactory); | ||
|
||
ByteBuffer buffer = ByteBuffer.wrap(new byte[longFirstAggFactory.getMaxIntermediateSize()]); | ||
agg.init(buffer, 0); | ||
|
||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
|
||
Pair<Long, Long> result = (Pair<Long, Long>) agg.get(buffer, 0); | ||
Pair<Long, Long> expected = (Pair<Long, Long>)pairs[0]; | ||
|
||
Assert.assertEquals(expected.lhs, result.lhs); | ||
Assert.assertEquals(expected.rhs, result.rhs); | ||
Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0)); | ||
Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001); | ||
} | ||
|
||
|
||
@Test | ||
public void testSerde() throws Exception | ||
{ | ||
DefaultObjectMapper mapper = new DefaultObjectMapper(); | ||
String longSpecJson = "{\"type\":\"longFirst\",\"name\":\"billy\",\"fieldName\":\"nilly\"}"; | ||
Assert.assertEquals(longFirstAggFactory, mapper.readValue(longSpecJson, AggregatorFactory.class)); | ||
} | ||
|
||
private void aggregate( | ||
LongFirstAggregator agg | ||
) | ||
{ | ||
agg.aggregate(); | ||
timeSelector.increment(); | ||
valueSelector.increment(); | ||
objectSelector.increment(); | ||
} | ||
|
||
private void aggregate( | ||
LongFirstBufferAggregator agg, | ||
ByteBuffer buff, | ||
int position | ||
) | ||
{ | ||
agg.aggregate(buff, position); | ||
timeSelector.increment(); | ||
valueSelector.increment(); | ||
objectSelector.increment(); | ||
} | ||
} |
201 changes: 201 additions & 0 deletions
201
processing/src/test/java/io/druid/query/aggregation/last/DoubleLastAggregationTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,201 @@ | ||
/* | ||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. Metamarkets licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package io.druid.query.aggregation.last; | ||
|
||
import io.druid.collections.SerializablePair; | ||
import io.druid.jackson.DefaultObjectMapper; | ||
import io.druid.java.util.common.Pair; | ||
import io.druid.query.aggregation.AggregatorFactory; | ||
import io.druid.query.aggregation.TestFloatColumnSelector; | ||
import io.druid.query.aggregation.TestLongColumnSelector; | ||
import io.druid.query.aggregation.TestObjectColumnSelector; | ||
import io.druid.segment.ColumnSelectorFactory; | ||
import io.druid.segment.column.Column; | ||
import org.easymock.EasyMock; | ||
import org.junit.Assert; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
|
||
import java.nio.ByteBuffer; | ||
|
||
public class DoubleLastAggregationTest | ||
{ | ||
private DoubleLastAggregatorFactory doubleLastAggFactory; | ||
private DoubleLastAggregatorFactory combiningAggFactory; | ||
private ColumnSelectorFactory colSelectorFactory; | ||
private TestLongColumnSelector timeSelector; | ||
private TestFloatColumnSelector valueSelector; | ||
private TestObjectColumnSelector objectSelector; | ||
|
||
private float[] floatValues = {1.1897f, 0.001f, 86.23f, 166.228f}; | ||
private long[] times = {8224, 6879, 2436, 7888}; | ||
private SerializablePair[] pairs = { | ||
new SerializablePair<>(52782L, 134.3d), | ||
new SerializablePair<>(65492L, 1232.212d), | ||
new SerializablePair<>(69134L, 18.1233d), | ||
new SerializablePair<>(11111L, 233.5232d) | ||
}; | ||
|
||
@Before | ||
public void setup() | ||
{ | ||
doubleLastAggFactory = new DoubleLastAggregatorFactory("billy", "nilly"); | ||
combiningAggFactory = (DoubleLastAggregatorFactory) doubleLastAggFactory.getCombiningFactory(); | ||
timeSelector = new TestLongColumnSelector(times); | ||
valueSelector = new TestFloatColumnSelector(floatValues); | ||
objectSelector = new TestObjectColumnSelector(pairs); | ||
colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); | ||
EasyMock.expect(colSelectorFactory.makeLongColumnSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector); | ||
EasyMock.expect(colSelectorFactory.makeFloatColumnSelector("nilly")).andReturn(valueSelector); | ||
EasyMock.expect(colSelectorFactory.makeObjectColumnSelector("billy")).andReturn(objectSelector); | ||
EasyMock.replay(colSelectorFactory); | ||
} | ||
|
||
@Test | ||
public void testDoubleLastAggregator() | ||
{ | ||
DoubleLastAggregator agg = (DoubleLastAggregator) doubleLastAggFactory.factorize(colSelectorFactory); | ||
|
||
Assert.assertEquals("billy", agg.getName()); | ||
|
||
aggregate(agg); | ||
aggregate(agg); | ||
aggregate(agg); | ||
aggregate(agg); | ||
|
||
Pair<Long, Double> result = (Pair<Long, Double>) agg.get(); | ||
|
||
Assert.assertEquals(times[0], result.lhs.longValue()); | ||
Assert.assertEquals(floatValues[0], result.rhs, 0.0001); | ||
Assert.assertEquals((long) floatValues[0], agg.getLong()); | ||
Assert.assertEquals(floatValues[0], agg.getFloat(), 0.0001); | ||
|
||
agg.reset(); | ||
Assert.assertEquals(0, ((Pair<Long, Double>) agg.get()).rhs, 0.0001); | ||
} | ||
|
||
@Test | ||
public void testDoubleLastBufferAggregator() | ||
{ | ||
DoubleLastBufferAggregator agg = (DoubleLastBufferAggregator) doubleLastAggFactory.factorizeBuffered( | ||
colSelectorFactory); | ||
|
||
ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleLastAggFactory.getMaxIntermediateSize()]); | ||
agg.init(buffer, 0); | ||
|
||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
|
||
Pair<Long, Double> result = (Pair<Long, Double>) agg.get(buffer, 0); | ||
|
||
Assert.assertEquals(times[0], result.lhs.longValue()); | ||
Assert.assertEquals(floatValues[0], result.rhs, 0.0001); | ||
Assert.assertEquals((long) floatValues[0], agg.getLong(buffer, 0)); | ||
Assert.assertEquals(floatValues[0], agg.getFloat(buffer, 0), 0.0001); | ||
} | ||
|
||
@Test | ||
public void testCombine() | ||
{ | ||
SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621); | ||
SerializablePair pair2 = new SerializablePair<>(1467240000L, 785.4); | ||
Assert.assertEquals(pair2, doubleLastAggFactory.combine(pair1, pair2)); | ||
} | ||
|
||
@Test | ||
public void testDoubleLastCombiningAggregator() | ||
{ | ||
DoubleLastAggregator agg = (DoubleLastAggregator) combiningAggFactory.factorize(colSelectorFactory); | ||
|
||
Assert.assertEquals("billy", agg.getName()); | ||
|
||
aggregate(agg); | ||
aggregate(agg); | ||
aggregate(agg); | ||
aggregate(agg); | ||
|
||
Pair<Long, Double> result = (Pair<Long, Double>) agg.get(); | ||
Pair<Long, Double> expected = (Pair<Long, Double>)pairs[2]; | ||
|
||
Assert.assertEquals(expected.lhs, result.lhs); | ||
Assert.assertEquals(expected.rhs, result.rhs, 0.0001); | ||
Assert.assertEquals(expected.rhs.longValue(), agg.getLong()); | ||
Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001); | ||
|
||
agg.reset(); | ||
Assert.assertEquals(0, ((Pair<Long, Double>) agg.get()).rhs, 0.0001); | ||
} | ||
|
||
@Test | ||
public void testDoubleLastCombiningBufferAggregator() | ||
{ | ||
DoubleLastBufferAggregator agg = (DoubleLastBufferAggregator) combiningAggFactory.factorizeBuffered( | ||
colSelectorFactory); | ||
|
||
ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleLastAggFactory.getMaxIntermediateSize()]); | ||
agg.init(buffer, 0); | ||
|
||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
aggregate(agg, buffer, 0); | ||
|
||
Pair<Long, Double> result = (Pair<Long, Double>) agg.get(buffer, 0); | ||
Pair<Long, Double> expected = (Pair<Long, Double>)pairs[2]; | ||
|
||
Assert.assertEquals(expected.lhs, result.lhs); | ||
Assert.assertEquals(expected.rhs, result.rhs, 0.0001); | ||
Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0)); | ||
Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001); | ||
} | ||
|
||
|
||
@Test | ||
public void testSerde() throws Exception | ||
{ | ||
DefaultObjectMapper mapper = new DefaultObjectMapper(); | ||
String doubleSpecJson = "{\"type\":\"doubleLast\",\"name\":\"billy\",\"fieldName\":\"nilly\"}"; | ||
Assert.assertEquals(doubleLastAggFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class)); | ||
} | ||
|
||
private void aggregate( | ||
DoubleLastAggregator agg | ||
) | ||
{ | ||
agg.aggregate(); | ||
timeSelector.increment(); | ||
valueSelector.increment(); | ||
objectSelector.increment(); | ||
} | ||
|
||
private void aggregate( | ||
DoubleLastBufferAggregator agg, | ||
ByteBuffer buff, | ||
int position | ||
) | ||
{ | ||
agg.aggregate(buff, position); | ||
timeSelector.increment(); | ||
valueSelector.increment(); | ||
objectSelector.increment(); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also document that if the rows are rolled up, the first/last value will be a the first/last value in druid row and not the raw data being ingested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added note about rollup behavior