diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java index 8e1e7643f321..b9a369c33351 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java @@ -107,6 +107,8 @@ static void updateUnion(Union union, Object update) union.update((Memory) update); } else if (update instanceof Sketch) { union.update((Sketch) update); + } else if (update instanceof Union) { + union.update(((Union) update).getResult(false, null)); } else if (update instanceof String) { union.update((String) update); } else if (update instanceof byte[]) { diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index 457d5f6b12b5..f45d08e607e8 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -110,10 +110,21 @@ public Comparator getComparator() @Override public Object combine(Object lhs, Object rhs) { - Union union = (Union) SetOperation.builder().build(size, Family.UNION); - updateUnion(union, lhs); - updateUnion(union, rhs); - return union.getResult(false, null); + final Union union; + if (lhs instanceof Union) { + union = (Union) lhs; + updateUnion(union, rhs); + } else if (rhs instanceof Union) { + union = (Union) rhs; + updateUnion(union, lhs); + } else { + union = (Union) SetOperation.builder().build(size, Family.UNION); + updateUnion(union, lhs); + updateUnion(union, rhs); + } + + + return union; } private void updateUnion(Union union, Object obj) @@ -124,6 +135,8 @@ private void updateUnion(Union union, Object obj) union.update((Memory) obj); } else if (obj instanceof Sketch) { union.update((Sketch) obj); + } else if (obj instanceof Union) { + union.update(((Union) obj).getResult(false, null)); } else { throw new IAE("Object of type [%s] can not be unioned", obj.getClass().getName()); } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java index 34bb7e1e2aa4..4c5156c58f49 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java @@ -82,7 +82,7 @@ public int compare(Object o1, Object o2) @Override public Object compute(Map combinedAggregators) { - Sketch sketch = (Sketch) field.compute(combinedAggregators); + Sketch sketch = SketchSetPostAggregator.toSketch(field.compute(combinedAggregators)); if (errorBoundsStdDev != null) { SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds( sketch.getEstimate(), diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java index 84b0853cf227..4daea218a59b 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java @@ -25,6 +25,7 @@ import com.google.inject.Binder; import com.yahoo.sketches.memory.Memory; import com.yahoo.sketches.theta.Sketch; +import com.yahoo.sketches.theta.Union; import io.druid.initialization.DruidModule; import io.druid.segment.serde.ComplexMetrics; @@ -71,7 +72,11 @@ public List getJacksonModules() Sketch.class, new SketchJsonSerializer() ) .addSerializer( - Memory.class, new MemoryJsonSerializer()) + Memory.class, new MemoryJsonSerializer() + ) + .addSerializer( + Union.class, new UnionJsonSerializer() + ) ); } } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java index a09146855a21..99aae90d9740 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java @@ -26,6 +26,7 @@ import com.yahoo.sketches.memory.NativeMemory; import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Sketches; +import com.yahoo.sketches.theta.Union; import io.druid.segment.data.ObjectStrategy; import java.nio.ByteBuffer; @@ -98,6 +99,8 @@ public byte[] toBytes(Object obj) byte[] retVal = new byte[(int) mem.getCapacity()]; mem.getByteArray(0, retVal, 0, (int) mem.getCapacity()); return retVal; + } else if (obj instanceof Union) { + return toBytes(((Union) obj).getResult(true, null)); } else if (obj == null) { return EMPTY_BYTES; } else { diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java index a57e87f0833f..d96abcad4d1e 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java @@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger; import com.yahoo.sketches.Util; import com.yahoo.sketches.theta.Sketch; +import com.yahoo.sketches.theta.Union; import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; @@ -83,12 +84,23 @@ public Object compute(final Map combinedAggregators) { Sketch[] sketches = new Sketch[fields.size()]; for (int i = 0; i < sketches.length; i++) { - sketches[i] = (Sketch) fields.get(i).compute(combinedAggregators); + sketches[i] = toSketch(fields.get(i).compute(combinedAggregators)); } return SketchOperations.sketchSetOperation(func, maxSketchSize, sketches); } + public final static Sketch toSketch(Object obj) + { + if (obj instanceof Sketch) { + return (Sketch) obj; + } else if (obj instanceof Union) { + return ((Union) obj).getResult(true, null); + } else { + throw new IAE("Can't convert to Sketch object [%s]", obj.getClass()); + } + } + @Override @JsonProperty public String getName() diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/UnionJsonSerializer.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/UnionJsonSerializer.java new file mode 100644 index 000000000000..865d268ec5bf --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/UnionJsonSerializer.java @@ -0,0 +1,40 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query.aggregation.datasketches.theta; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.yahoo.sketches.theta.Union; + +import java.io.IOException; + +/** + */ +public class UnionJsonSerializer extends JsonSerializer +{ + @Override + public void serialize(Union union, JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException + { + jgen.writeBinary(union.getResult(true, null).toByteArray()); + } +}