diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java index 34e04ee071f2..ad46182262e9 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java @@ -89,6 +89,8 @@ static void updateUnion(Union union, Object update) union.update((Memory) update); } else if (update instanceof Sketch) { union.update((Sketch) update); + } else if (update instanceof Union) { + union.update(((Union) update).getResult()); } else if (update instanceof String) { union.update((String) update); } else if (update instanceof byte[]) { diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index b8e8bf21d3d9..87374de1e4ec 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -117,10 +117,19 @@ public Comparator getComparator() @Override public Object combine(Object lhs, Object rhs) { - Union union = (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION); - updateUnion(union, lhs); - updateUnion(union, rhs); - return union.getResult(false, null); + final Union union; + if (lhs instanceof Union) { + union = (Union) lhs; + updateUnion(union, rhs); + } else if (rhs instanceof Union) { + union = (Union) rhs; + updateUnion(union, lhs); + } else { + union = (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION); + updateUnion(union, lhs); + updateUnion(union, rhs); + } + return union; } private void updateUnion(Union union, Object obj) @@ -131,6 +140,8 @@ private void updateUnion(Union union, Object obj) union.update((Memory) obj); } else if (obj instanceof Sketch) { union.update((Sketch) obj); + } else if (obj instanceof Union) { + union.update(((Union) obj).getResult(false, null)); } else { throw new IAE("Object of type [%s] can not be unioned", obj.getClass().getName()); } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java index 357f6b84d6bd..5831f1af2cf6 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java @@ -33,6 +33,7 @@ import java.util.Comparator; import java.util.Map; +import java.util.Objects; import java.util.Set; public class SketchEstimatePostAggregator implements PostAggregator @@ -91,14 +92,13 @@ public ValueDesc resolve(TypeResolver bindings) @Override public Object compute(DateTime timestamp, Map combinedAggregators) { - Sketch sketch = (Sketch) field.compute(timestamp, combinedAggregators); + Sketch sketch = SketchSetPostAggregator.toSketch(field.compute(timestamp, combinedAggregators)); if (errorBoundsStdDev != null) { - SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds( + return new SketchEstimateWithErrorBounds( sketch.getEstimate(), sketch.getUpperBound(errorBoundsStdDev), sketch.getLowerBound(errorBoundsStdDev), errorBoundsStdDev); - return result; } else { return sketch.getEstimate(); } @@ -148,7 +148,7 @@ public boolean equals(Object o) if (!name.equals(that.name)) { return false; } - if (errorBoundsStdDev != that.errorBoundsStdDev) { + if (!Objects.equals(errorBoundsStdDev, that.errorBoundsStdDev)) { return false; } return field.equals(that.field); @@ -160,7 +160,7 @@ public int hashCode() { int result = name.hashCode(); result = 31 * result + field.hashCode(); - result = 31 * result + (errorBoundsStdDev != null ? errorBoundsStdDev.hashCode() : 0); + result = 31 * result + Objects.hashCode(errorBoundsStdDev); return result; } } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java index e49d364a220a..6aac82b89a37 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java @@ -28,6 +28,7 @@ import com.yahoo.sketches.quantiles.ItemsSketch; import com.yahoo.sketches.sampling.ReservoirItemsSketch; import com.yahoo.sketches.theta.Sketch; +import com.yahoo.sketches.theta.Union; import io.druid.guice.LazySingleton; import io.druid.guice.QueryToolBinders; import io.druid.initialization.DruidModule; @@ -114,6 +115,9 @@ public List getJacksonModules() .addSerializer( Sketch.class, new SketchJsonSerializer() ) + .addSerializer( + Union.class, new UnionJsonSerializer() + ) .addSerializer( ItemsSketch.class, new ItemsSketchJsonSerializer() ) diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java index a512e8714213..35ce482ce36e 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java @@ -24,6 +24,7 @@ import com.yahoo.memory.Memory; import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Sketches; +import com.yahoo.sketches.theta.Union; import io.druid.segment.data.ObjectStrategy; import java.nio.ByteBuffer; @@ -44,6 +45,13 @@ public int compare(Object s1, Object s2) return -1; } } + if (s1 instanceof Union) { + if (s2 instanceof Union) { + return SketchAggregatorFactory.COMPARATOR.compare(((Union) s1).getResult(), ((Union) s2).getResult()); + } else { + return -1; + } + } if (s1 instanceof Memory) { if (s2 instanceof Memory) { Memory s1Mem = (Memory) s1; @@ -62,7 +70,7 @@ public int compare(Object s1, Object s2) return 1; } } - throw new IAE("Unknwon class[%s], toString[%s]", s1.getClass(), s1); + throw new IAE("Unknown class[%s], toString[%s]", s1.getClass(), s1); } @@ -96,6 +104,8 @@ public byte[] toBytes(Object obj) byte[] retVal = new byte[(int) mem.getCapacity()]; mem.getByteArray(0, retVal, 0, (int) mem.getCapacity()); return retVal; + } else if (obj instanceof Union) { + return toBytes(((Union) obj).getResult(true, null)); } else if (obj == null) { return EMPTY_BYTES; } else { diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java index dd140be875e2..c124f23f6406 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java @@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger; import com.yahoo.sketches.Util; import com.yahoo.sketches.theta.Sketch; +import com.yahoo.sketches.theta.Union; import io.druid.data.TypeResolver; import io.druid.data.ValueDesc; import io.druid.query.aggregation.PostAggregator; @@ -92,12 +93,23 @@ public Object compute(DateTime timestamp, final Map combinedAggr { Sketch[] sketches = new Sketch[fields.size()]; for (int i = 0; i < sketches.length; i++) { - sketches[i] = (Sketch) fields.get(i).compute(timestamp, combinedAggregators); + sketches[i] = toSketch(fields.get(i).compute(timestamp, combinedAggregators)); } return SketchOperations.sketchSetOperation(func, maxSketchSize, sketches); } + static Sketch toSketch(Object obj) + { + if (obj instanceof Sketch) { + return (Sketch) obj; + } else if (obj instanceof Union) { + return ((Union) obj).getResult(true, null); + } else { + throw new IAE("Can't convert to Sketch object [%s]", obj.getClass()); + } + } + @Override @JsonProperty public String getName() diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/UnionJsonSerializer.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/UnionJsonSerializer.java new file mode 100644 index 000000000000..4be8e33b6a8b --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/UnionJsonSerializer.java @@ -0,0 +1,37 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.theta; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.yahoo.sketches.theta.Union; + +import java.io.IOException; + +public class UnionJsonSerializer extends JsonSerializer +{ + @Override + public void serialize(Union union, JsonGenerator jgen, SerializerProvider provider) + throws IOException + { + jgen.writeBinary(union.getResult(true, null).toByteArray()); + } +}