Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finalize fields in postaggs #3957

Merged
merged 16 commits into from
Feb 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.primitives.Doubles;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
Expand Down Expand Up @@ -97,6 +98,12 @@ public String getName()
return name;
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public PostAggregator getField()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.Sets;
import com.yahoo.sketches.Util;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
Expand Down Expand Up @@ -93,6 +94,12 @@ public String getName()
return name;
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public String getFunc()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -65,6 +67,12 @@ public Object compute(Map<String, Object> values)
return ah.toHistogram(bucketSize, offset);
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public float getBucketSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -61,6 +63,12 @@ public Object compute(Map<String, Object> values)
return ah.toHistogram(breaks);
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public float[] getBreaks()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -62,6 +64,12 @@ public Object compute(Map<String, Object> values)
return ah.toHistogram(numBuckets);
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public int getNumBuckets()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -70,6 +72,12 @@ public Object compute(Map<String, Object> values)
return ah.getMax();
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -70,6 +72,12 @@ public Object compute(Map<String, Object> values)
return ah.getMin();
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -81,6 +83,12 @@ public Object compute(Map<String, Object> values)
return ah.getQuantiles(new float[]{probability})[0];
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public float getProbability()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -74,6 +76,12 @@ public Object compute(Map<String, Object> values)
return new Quantiles(probabilities, ah.getQuantiles(probabilities), ah.getMin(), ah.getMax());
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty
public float[] getProbabilities()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
Expand Down Expand Up @@ -82,6 +83,12 @@ public String getName()
return name;
}

@Override
public PostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty("fieldName")
public String getFieldName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.druid.query.aggregation.post.DoubleLeastPostAggregator;
import io.druid.query.aggregation.post.ExpressionPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import io.druid.query.aggregation.post.JavaScriptPostAggregator;
import io.druid.query.aggregation.post.LongGreatestPostAggregator;
import io.druid.query.aggregation.post.LongLeastPostAggregator;
Expand Down Expand Up @@ -103,6 +104,7 @@ public static interface AggregatorFactoryMixin
@JsonSubTypes.Type(name = "expression", value = ExpressionPostAggregator.class),
@JsonSubTypes.Type(name = "arithmetic", value = ArithmeticPostAggregator.class),
@JsonSubTypes.Type(name = "fieldAccess", value = FieldAccessPostAggregator.class),
@JsonSubTypes.Type(name = "finalizingFieldAccess", value = FinalizingFieldAccessPostAggregator.class),
@JsonSubTypes.Type(name = "constant", value = ConstantPostAggregator.class),
@JsonSubTypes.Type(name = "javascript", value = JavaScriptPostAggregator.class),
@JsonSubTypes.Type(name = "hyperUniqueCardinality", value = HyperUniqueFinalizingPostAggregator.class),
Expand Down
34 changes: 28 additions & 6 deletions processing/src/main/java/io/druid/query/Queries.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,64 @@
package io.druid.query;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;

import java.util.List;
import java.util.Map;
import java.util.Set;

/**
*/
public class Queries
{
public static void verifyAggregations(
public static List<PostAggregator> decoratePostAggregators(List<PostAggregator> postAggs,
Map<String, AggregatorFactory> aggFactories)
{
List<PostAggregator> decorated = Lists.newArrayListWithExpectedSize(postAggs.size());
for (PostAggregator aggregator : postAggs) {
decorated.add(aggregator.decorate(aggFactories));
}
return decorated;
}

public static List<PostAggregator> prepareAggregations(
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
)
{
Preconditions.checkNotNull(aggFactories, "aggregations cannot be null");

final Set<String> aggNames = Sets.newHashSet();
final Map<String, AggregatorFactory> aggsFactoryMap = Maps.newHashMap();
for (AggregatorFactory aggFactory : aggFactories) {
Preconditions.checkArgument(aggNames.add(aggFactory.getName()), "[%s] already defined", aggFactory.getName());
Preconditions.checkArgument(!aggsFactoryMap.containsKey(aggFactory.getName()),
"[%s] already defined", aggFactory.getName());
aggsFactoryMap.put(aggFactory.getName(), aggFactory);
}

if (postAggs != null && !postAggs.isEmpty()) {
final Set<String> combinedAggNames = Sets.newHashSet(aggNames);
final Set<String> combinedAggNames = Sets.newHashSet(aggsFactoryMap.keySet());

for (PostAggregator postAgg : postAggs) {
List<PostAggregator> decorated = Lists.newArrayListWithExpectedSize(postAggs.size());
for (final PostAggregator postAgg : postAggs) {
final Set<String> dependencies = postAgg.getDependentFields();
final Set<String> missing = Sets.difference(dependencies, combinedAggNames);

Preconditions.checkArgument(
missing.isEmpty(),
"Missing fields [%s] for postAggregator [%s]", missing, postAgg.getName()
);
Preconditions.checkArgument(combinedAggNames.add(postAgg.getName()), "[%s] already defined", postAgg.getName());
Preconditions.checkArgument(combinedAggNames.add(postAgg.getName()),
"[%s] already defined", postAgg.getName());

decorated.add(postAgg.decorate(aggsFactoryMap));
}
return decorated;
}

return postAggs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,13 @@ public interface PostAggregator extends Cacheable
Object compute(Map<String, Object> combinedAggregators);

String getName();

/**
* Returns a richer post aggregator which are built from the given aggregators with their names and some accessible
* environmental variables such as ones in the object scope.
*
* @param aggregators A map of aggregator factories with their names.
*
*/
PostAggregator decorate(Map<String, AggregatorFactory> aggregators);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.PostAggregatorIds;
import io.druid.query.cache.CacheKeyBuilder;
Expand Down Expand Up @@ -86,6 +87,12 @@ public String getName()
return name;
}

@Override
public HyperUniqueFinalizingPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty("fieldName")
public String getFieldName()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.java.util.common.IAE;
import io.druid.query.Queries;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -53,6 +55,7 @@ public int compare(Object o, Object o1)
private final Ops op;
private final Comparator comparator;
private final String ordering;
private Map<String, AggregatorFactory> aggFactoryMap;

public ArithmeticPostAggregator(
String name,
Expand Down Expand Up @@ -124,6 +127,12 @@ public String getName()
return name;
}

@Override
public ArithmeticPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return new ArithmeticPostAggregator(name, fnName, Queries.decoratePostAggregators(fields, aggregators), ordering);
}

@Override
public byte[] getCacheKey()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;

Expand Down Expand Up @@ -80,6 +81,12 @@ public String getName()
return name;
}

@Override
public ConstantPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
{
return this;
}

@JsonProperty("value")
public Number getConstantValue()
{
Expand Down
Loading