Skip to content

Commit

Permalink
Backport of apache#3471 (return Union for SketchAggregatorFactory.com…
Browse files Browse the repository at this point in the history
…bine)
  • Loading branch information
navis committed Oct 1, 2018
1 parent ad9fd5e commit 5fc2f8b
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ static void updateUnion(Union union, Object update)
union.update((Memory) update);
} else if (update instanceof Sketch) {
union.update((Sketch) update);
} else if (update instanceof Union) {
union.update(((Union) update).getResult());
} else if (update instanceof String) {
union.update((String) update);
} else if (update instanceof byte[]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,19 @@ public Comparator<Sketch> getComparator()
@Override
public Object combine(Object lhs, Object rhs)
{
Union union = (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION);
updateUnion(union, lhs);
updateUnion(union, rhs);
return union.getResult(false, null);
final Union union;
if (lhs instanceof Union) {
union = (Union) lhs;
updateUnion(union, rhs);
} else if (rhs instanceof Union) {
union = (Union) rhs;
updateUnion(union, lhs);
} else {
union = (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION);
updateUnion(union, lhs);
updateUnion(union, rhs);
}
return union;
}

private void updateUnion(Union union, Object obj)
Expand All @@ -131,6 +140,8 @@ private void updateUnion(Union union, Object obj)
union.update((Memory) obj);
} else if (obj instanceof Sketch) {
union.update((Sketch) obj);
} else if (obj instanceof Union) {
union.update(((Union) obj).getResult(false, null));
} else {
throw new IAE("Object of type [%s] can not be unioned", obj.getClass().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public class SketchEstimatePostAggregator implements PostAggregator
Expand Down Expand Up @@ -91,14 +92,13 @@ public ValueDesc resolve(TypeResolver bindings)
@Override
public Object compute(DateTime timestamp, Map<String, Object> combinedAggregators)
{
Sketch sketch = (Sketch) field.compute(timestamp, combinedAggregators);
Sketch sketch = SketchSetPostAggregator.toSketch(field.compute(timestamp, combinedAggregators));
if (errorBoundsStdDev != null) {
SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds(
return new SketchEstimateWithErrorBounds(
sketch.getEstimate(),
sketch.getUpperBound(errorBoundsStdDev),
sketch.getLowerBound(errorBoundsStdDev),
errorBoundsStdDev);
return result;
} else {
return sketch.getEstimate();
}
Expand Down Expand Up @@ -148,7 +148,7 @@ public boolean equals(Object o)
if (!name.equals(that.name)) {
return false;
}
if (errorBoundsStdDev != that.errorBoundsStdDev) {
if (!Objects.equals(errorBoundsStdDev, that.errorBoundsStdDev)) {
return false;
}
return field.equals(that.field);
Expand All @@ -160,7 +160,7 @@ public int hashCode()
{
int result = name.hashCode();
result = 31 * result + field.hashCode();
result = 31 * result + (errorBoundsStdDev != null ? errorBoundsStdDev.hashCode() : 0);
result = 31 * result + Objects.hashCode(errorBoundsStdDev);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.yahoo.sketches.quantiles.ItemsSketch;
import com.yahoo.sketches.sampling.ReservoirItemsSketch;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
import io.druid.guice.LazySingleton;
import io.druid.guice.QueryToolBinders;
import io.druid.initialization.DruidModule;
Expand Down Expand Up @@ -114,6 +115,9 @@ public List<? extends Module> getJacksonModules()
.addSerializer(
Sketch.class, new SketchJsonSerializer()
)
.addSerializer(
Union.class, new UnionJsonSerializer()
)
.addSerializer(
ItemsSketch.class, new ItemsSketchJsonSerializer()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.yahoo.memory.Memory;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.Union;
import io.druid.segment.data.ObjectStrategy;

import java.nio.ByteBuffer;
Expand All @@ -44,6 +45,13 @@ public int compare(Object s1, Object s2)
return -1;
}
}
if (s1 instanceof Union) {
if (s2 instanceof Union) {
return SketchAggregatorFactory.COMPARATOR.compare(((Union) s1).getResult(), ((Union) s2).getResult());
} else {
return -1;
}
}
if (s1 instanceof Memory) {
if (s2 instanceof Memory) {
Memory s1Mem = (Memory) s1;
Expand All @@ -62,7 +70,7 @@ public int compare(Object s1, Object s2)
return 1;
}
}
throw new IAE("Unknwon class[%s], toString[%s]", s1.getClass(), s1);
throw new IAE("Unknown class[%s], toString[%s]", s1.getClass(), s1);

}

Expand Down Expand Up @@ -96,6 +104,8 @@ public byte[] toBytes(Object obj)
byte[] retVal = new byte[(int) mem.getCapacity()];
mem.getByteArray(0, retVal, 0, (int) mem.getCapacity());
return retVal;
} else if (obj instanceof Union) {
return toBytes(((Union) obj).getResult(true, null));
} else if (obj == null) {
return EMPTY_BYTES;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.metamx.common.logger.Logger;
import com.yahoo.sketches.Util;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
import io.druid.data.TypeResolver;
import io.druid.data.ValueDesc;
import io.druid.query.aggregation.PostAggregator;
Expand Down Expand Up @@ -92,12 +93,23 @@ public Object compute(DateTime timestamp, final Map<String, Object> combinedAggr
{
Sketch[] sketches = new Sketch[fields.size()];
for (int i = 0; i < sketches.length; i++) {
sketches[i] = (Sketch) fields.get(i).compute(timestamp, combinedAggregators);
sketches[i] = toSketch(fields.get(i).compute(timestamp, combinedAggregators));
}

return SketchOperations.sketchSetOperation(func, maxSketchSize, sketches);
}

static Sketch toSketch(Object obj)
{
if (obj instanceof Sketch) {
return (Sketch) obj;
} else if (obj instanceof Union) {
return ((Union) obj).getResult(true, null);
} else {
throw new IAE("Can't convert to Sketch object [%s]", obj.getClass());
}
}

@Override
@JsonProperty
public String getName()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.datasketches.theta;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.yahoo.sketches.theta.Union;

import java.io.IOException;

public class UnionJsonSerializer extends JsonSerializer<Union>
{
@Override
public void serialize(Union union, JsonGenerator jgen, SerializerProvider provider)
throws IOException
{
jgen.writeBinary(union.getResult(true, null).toByteArray());
}
}

0 comments on commit 5fc2f8b

Please sign in to comment.