Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finalize fields in postaggs #2883

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.primitives.Doubles;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;

import java.util.Comparator;
Expand Down Expand Up @@ -95,6 +96,12 @@ public String getName()
return name;
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public PostAggregator getField()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.yahoo.sketches.Util;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;

import java.util.Comparator;
Expand Down Expand Up @@ -95,6 +96,12 @@ public String getName()
return name;
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public String getFunc()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.common.collect.Sets;

import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;

import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -67,6 +69,12 @@ public Object compute(Map<String, Object> values)
return ah.toHistogram(bucketSize, offset);
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public float getBucketSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;

import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -59,6 +61,12 @@ public Object compute(Map<String, Object> values)
return ah.toHistogram(breaks);
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public float[] getBreaks()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.common.collect.Sets;

import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;

import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -63,6 +65,12 @@ public Object compute(Map<String, Object> values)
return ah.toHistogram(numBuckets);
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public int getNumBuckets()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;

import java.util.Comparator;
import java.util.Map;
Expand Down Expand Up @@ -71,6 +73,12 @@ public Object compute(Map<String, Object> values)
return ah.getMax();
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;

import java.util.Comparator;
import java.util.Map;
Expand Down Expand Up @@ -71,6 +73,12 @@ public Object compute(Map<String, Object> values)
return ah.getMin();
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.common.collect.Sets;

import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;

import java.util.Comparator;
import java.util.Map;
Expand Down Expand Up @@ -80,6 +82,12 @@ public Object compute(Map<String, Object> values)
return ah.getQuantiles(new float[]{this.getProbability()})[0];
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public float getProbability()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.common.collect.Sets;

import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;

import java.util.Arrays;
import java.util.Comparator;
Expand Down Expand Up @@ -75,6 +77,12 @@ public Object compute(Map<String, Object> values)
return new Quantiles(this.getProbabilities(), ah.getQuantiles(this.getProbabilities()), ah.getMin(), ah.getMax());
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public float[] getProbabilities()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;

Expand Down Expand Up @@ -80,6 +81,12 @@ public String getName()
return name;
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty("fieldName")
public String getFieldName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.druid.query.aggregation.post.DoubleGreatestPostAggregator;
import io.druid.query.aggregation.post.DoubleLeastPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import io.druid.query.aggregation.post.JavaScriptPostAggregator;
import io.druid.query.aggregation.post.ExpressionPostAggregator;
import io.druid.query.aggregation.post.LongGreatestPostAggregator;
Expand Down Expand Up @@ -98,6 +99,7 @@ public static interface AggregatorFactoryMixin
@JsonSubTypes.Type(name = "expression", value = ExpressionPostAggregator.class),
@JsonSubTypes.Type(name = "arithmetic", value = ArithmeticPostAggregator.class),
@JsonSubTypes.Type(name = "fieldAccess", value = FieldAccessPostAggregator.class),
@JsonSubTypes.Type(name = "finalizingFieldAccess", value = FinalizingFieldAccessPostAggregator.class),
@JsonSubTypes.Type(name = "constant", value = ConstantPostAggregator.class),
@JsonSubTypes.Type(name = "javascript", value = JavaScriptPostAggregator.class),
@JsonSubTypes.Type(name = "hyperUniqueCardinality", value = HyperUniqueFinalizingPostAggregator.class),
Expand Down
34 changes: 28 additions & 6 deletions processing/src/main/java/io/druid/query/Queries.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,64 @@
package io.druid.query;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;

import java.util.List;
import java.util.Map;
import java.util.Set;

/**
*/
public class Queries
{
public static void verifyAggregations(
public static List<PostAggregator> decoratePostAggregators(List<PostAggregator> postAggs,
Map<String, AggregatorFactory> aggFactories)
{
List<PostAggregator> decorated = Lists.newArrayListWithExpectedSize(postAggs.size());
for (PostAggregator aggregator : postAggs) {
decorated.add(aggregator.decorate(aggFactories));
}
return decorated;
}

public static List<PostAggregator> prepareAggregations(
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
)
{
Preconditions.checkNotNull(aggFactories, "aggregations cannot be null");

final Set<String> aggNames = Sets.newHashSet();
final Map<String, AggregatorFactory> aggsFactoryMap = Maps.newHashMap();
for (AggregatorFactory aggFactory : aggFactories) {
Preconditions.checkArgument(aggNames.add(aggFactory.getName()), "[%s] already defined", aggFactory.getName());
Preconditions.checkArgument(!aggsFactoryMap.containsKey(aggFactory.getName()),
"[%s] already defined", aggFactory.getName());
aggsFactoryMap.put(aggFactory.getName(), aggFactory);
}

if (postAggs != null && !postAggs.isEmpty()) {
final Set<String> combinedAggNames = Sets.newHashSet(aggNames);
final Set<String> combinedAggNames = Sets.newHashSet(aggsFactoryMap.keySet());

for (PostAggregator postAgg : postAggs) {
List<PostAggregator> decorated = Lists.newArrayListWithExpectedSize(postAggs.size());
for (final PostAggregator postAgg : postAggs) {
final Set<String> dependencies = postAgg.getDependentFields();
final Set<String> missing = Sets.difference(dependencies, combinedAggNames);

Preconditions.checkArgument(
missing.isEmpty(),
"Missing fields [%s] for postAggregator [%s]", missing, postAgg.getName()
);
Preconditions.checkArgument(combinedAggNames.add(postAgg.getName()), "[%s] already defined", postAgg.getName());
Preconditions.checkArgument(combinedAggNames.add(postAgg.getName()),
"[%s] already defined", postAgg.getName());

decorated.add(postAgg.decorate(aggsFactoryMap));
}
return decorated;
}

return postAggs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,13 @@ public interface PostAggregator
public Object compute(Map<String, Object> combinedAggregators);

public String getName();

/**
* Returns a richer post aggregator which are built from the given aggregators with their names and some accessible
* environmental variables such as ones in the object scope.
*
* @param aggregators A map of aggregator factories with their names.
*
*/
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;

import java.util.Comparator;
Expand Down Expand Up @@ -84,6 +85,12 @@ public String getName()
return name;
}

@Override
public HyperUniqueFinalizingPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty("fieldName")
public String getFieldName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.Queries;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;

import java.util.Comparator;
Expand Down Expand Up @@ -52,6 +54,7 @@ public int compare(Object o, Object o1)
private final Ops op;
private final Comparator comparator;
private final String ordering;
private Map<String, AggregatorFactory> aggFactoryMap;

public ArithmeticPostAggregator(
String name,
Expand Down Expand Up @@ -123,6 +126,12 @@ public String getName()
return name;
}

@Override
public ArithmeticPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return new ArithmeticPostAggregator(name, fnName, Queries.decoratePostAggregators(fields, aggregators), ordering);
}

@JsonProperty("fn")
public String getFnName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;

import java.util.Comparator;
Expand Down Expand Up @@ -79,6 +80,12 @@ public String getName()
return name;
}

@Override
public ConstantPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty("value")
public Number getConstantValue()
{
Expand Down
Loading