Skip to content

Commit

Permalink
groupBy v1: Force all dimensions to strings. (apache#3685)
Browse files Browse the repository at this point in the history
  • Loading branch information
gianm authored and jon-wei committed Nov 14, 2016
1 parent 9186d54 commit 1f53f62
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,29 @@
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.google.common.collect.Sets;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

public class GroupByQueryHelper
Expand Down Expand Up @@ -88,13 +95,32 @@ public String apply(DimensionSpec input)

final boolean sortResults = query.getContextValue(CTX_KEY_SORT_RESULTS, true);

// All groupBy dimensions are strings, for now, as long as they don't conflict with any non-dimensions.
// This should get cleaned up if/when https://github.com/druid-io/druid/pull/3686 makes name conflicts impossible.
final Set<String> otherNames = Sets.newHashSet();
for (AggregatorFactory agg : aggs) {
otherNames.add(agg.getName());
}
for (PostAggregator postAggregator : query.getPostAggregatorSpecs()) {
otherNames.add(postAggregator.getName());
}
final List<DimensionSchema> dimensionSchemas = Lists.newArrayList();
for (DimensionSpec dimension : query.getDimensions()) {
if (!otherNames.contains(dimension.getOutputName())) {
dimensionSchemas.add(new StringDimensionSchema(dimension.getOutputName()));
}
}

final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withDimensionsSpec(new DimensionsSpec(dimensionSchemas, null, null))
.withMetrics(aggs.toArray(new AggregatorFactory[aggs.size()]))
.withQueryGranularity(gran)
.withMinTimestamp(granTimeStart)
.build();

if (query.getContextValue("useOffheap", false)) {
index = new OffheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
indexSchema,
false,
true,
sortResults,
Expand All @@ -103,11 +129,7 @@ public String apply(DimensionSpec input)
);
} else {
index = new OnheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
indexSchema,
false,
true,
sortResults,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import io.druid.query.filter.RegexDimFilter;
import io.druid.query.filter.SearchQueryDimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.groupby.having.DimensionSelectorHavingSpec;
import io.druid.query.groupby.having.EqualToHavingSpec;
import io.druid.query.groupby.having.GreaterThanHavingSpec;
import io.druid.query.groupby.having.HavingSpec;
Expand Down Expand Up @@ -3948,6 +3949,103 @@ public void testDifferentIntervalSubquery()
TestHelper.assertExpectedObjects(expectedResults, results, "");
}

@Test
public void testGroupByTimeExtractionNamedUnderUnderTime()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
.setDimensions(
Lists.newArrayList(
new DefaultDimensionSpec("market", "market"),
new ExtractionDimensionSpec(
Column.TIME_COLUMN_NAME,
Column.TIME_COLUMN_NAME,
new TimeFormatExtractionFn("EEEE", null, null, null),
null
)
)
)
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexDoubleSum
)
)
.setPostAggregatorSpecs(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.setGranularity(QueryRunnerTestHelper.allGran)
.setDimFilter(
new OrDimFilter(
Arrays.<DimFilter>asList(
new SelectorDimFilter("market", "spot", null),
new SelectorDimFilter("market", "upfront", null)
)
)
)
.setLimitSpec(new DefaultLimitSpec(ImmutableList.<OrderByColumnSpec>of(), 1))
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01",
"__time",
"Friday",
"market",
"spot",
"index",
13219.574157714844,
"rows",
117L,
"addRowsIndexConstant",
13337.574157714844
)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}

@Test
public void testGroupByWithUnderUnderTimeAsDimensionNameWithHavingAndLimit()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "__time")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.setHavingSpec(
new OrHavingSpec(
ImmutableList.<HavingSpec>of(
new DimensionSelectorHavingSpec("__time", "automotive", null),
new DimensionSelectorHavingSpec("__time", "business", null)
)
)
)
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(new OrderByColumnSpec("__time", OrderByColumnSpec.Direction.DESCENDING)),
null
)
)
.build();

List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "__time", "business", "rows", 1L, "idx", 118L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "__time", "automotive", "rows", 1L, "idx", 135L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "__time", "business", "rows", 1L, "idx", 112L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "__time", "automotive", "rows", 1L, "idx", 147L)
);

Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}

@Test
public void testEmptySubquery()
{
Expand Down

0 comments on commit 1f53f62

Please sign in to comment.