From ecd7a5c79691755fb5dad00a7115dbd4f518bc7b Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 12 Dec 2024 18:23:45 -0800 Subject: [PATCH] topn with granularity regression fixes changes: * fix issue where topN with query granularity other than ALL would use the heap algorithm when it was actual able to use the pooled algorithm, and incorrectly used the pool algorithm in cases where it must use the heap algorithm, a regression from #16533 * fix issue where topN with query granularity other than ALL could incorrectly process values in the wrong time bucket, another regression from #16533 --- .../druid/query/CursorGranularizer.java | 14 +- .../druid/query/topn/BaseTopNAlgorithm.java | 8 + ...Generic1AggPooledTopNScannerPrototype.java | 2 +- ...Generic2AggPooledTopNScannerPrototype.java | 2 +- .../druid/query/topn/PooledTopNAlgorithm.java | 2 +- .../topn/TimeExtractionTopNAlgorithm.java | 2 +- .../druid/query/topn/TopNQueryEngine.java | 2 +- ...eNumericTopNColumnAggregatesProcessor.java | 2 +- .../StringTopNColumnAggregatesProcessor.java | 4 +- .../druid/query/topn/TopNQueryRunnerTest.java | 219 ++++++++++++++++++ 10 files changed, 246 insertions(+), 11 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java b/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java index 6bc387183bae..e538ecca5025 100644 --- a/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java +++ b/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java @@ -94,7 +94,7 @@ public static CursorGranularizer create( timeSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME); } - return new CursorGranularizer(cursor, bucketIterable, timeSelector, timeOrder == Order.DESCENDING); + return new CursorGranularizer(cursor, granularity, bucketIterable, timeSelector, timeOrder == Order.DESCENDING); } private final Cursor cursor; @@ -109,20 +109,28 @@ public static CursorGranularizer create( private long currentBucketStart; private long currentBucketEnd; + private final Granularity granularity; private CursorGranularizer( Cursor cursor, + Granularity granularity, Iterable bucketIterable, @Nullable ColumnValueSelector timeSelector, boolean descending ) { this.cursor = cursor; + this.granularity = granularity; this.bucketIterable = bucketIterable; this.timeSelector = timeSelector; this.descending = descending; } + public Granularity getGranularity() + { + return granularity; + } + public Iterable getBucketIterable() { return bucketIterable; @@ -135,11 +143,11 @@ public DateTime getBucketStart() public boolean advanceToBucket(final Interval bucketInterval) { + currentBucketStart = bucketInterval.getStartMillis(); + currentBucketEnd = bucketInterval.getEndMillis(); if (cursor.isDone()) { return false; } - currentBucketStart = bucketInterval.getStartMillis(); - currentBucketEnd = bucketInterval.getEndMillis(); if (timeSelector == null) { return true; } diff --git a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java index 4c0bb066eecb..6f4bad3c2439 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java @@ -20,8 +20,10 @@ package org.apache.druid.query.topn; import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; @@ -103,6 +105,12 @@ private void runWithCardinalityKnown( while (numProcessed < cardinality) { final int numToProcess; int maxNumToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed); + // sanity check to ensure that we only do multi-pass with ALL granularity + if (maxNumToProcess < cardinality && !Granularities.ALL.equals(params.getGranularizer().getGranularity())) { + throw DruidException.defensive( + "runWithCardinalityKnown can only be used for ALL granularity if multiple-passes are required" + ); + } DimValSelector theDimValSelector; if (!hasDimValSelector) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java b/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java index f4c2ba1863bd..52d065c95c0b 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java +++ b/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java @@ -54,7 +54,7 @@ public long scanAndAggregate( { long processedRows = 0; int positionToAllocate = 0; - while (!cursor.isDoneOrInterrupted()) { + while (!cursor.isDoneOrInterrupted() && granularizer.currentOffsetWithinBucket()) { final IndexedInts dimValues = dimensionSelector.getRow(); final int dimSize = dimValues.size(); for (int i = 0; i < dimSize; i++) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java b/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java index 4de281d8a0b8..406344e26ca1 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java +++ b/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java @@ -57,7 +57,7 @@ public long scanAndAggregate( int totalAggregatorsSize = aggregator1Size + aggregator2Size; long processedRows = 0; int positionToAllocate = 0; - while (!cursor.isDoneOrInterrupted()) { + while (!cursor.isDoneOrInterrupted() && granularizer.currentOffsetWithinBucket()) { final IndexedInts dimValues = dimensionSelector.getRow(); final int dimSize = dimValues.size(); for (int i = 0; i < dimSize; i++) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java index d2ba16746218..b3f2a3b9028d 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java @@ -477,7 +477,7 @@ private static long scanAndAggregateDefault( final int aggExtra = aggSize % AGG_UNROLL_COUNT; int currentPosition = 0; long processedRows = 0; - while (!cursor.isDoneOrInterrupted()) { + while (!cursor.isDoneOrInterrupted() && granularizer.currentOffsetWithinBucket()) { final IndexedInts dimValues = dimSelector.getRow(); final int dimSize = dimValues.size(); diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java index 0c79e7c8d31b..72f2abc80bb0 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java @@ -93,7 +93,7 @@ protected long scanAndAggregate( final DimensionSelector dimSelector = params.getDimSelector(); long processedRows = 0; - while (!cursor.isDone()) { + while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) { final Object key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0))); Aggregator[] theAggregators = aggregatesStore.computeIfAbsent( diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index 1382c9aaa4b7..414f5bad26cb 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -275,7 +275,7 @@ private static boolean canUsePooledAlgorithm( final int numBytesToWorkWith = resultsBuf.capacity(); final int numValuesPerPass = numBytesPerRecord > 0 ? numBytesToWorkWith / numBytesPerRecord : cardinality; - return numValuesPerPass <= cardinality; + return numValuesPerPass >= cardinality; } } diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java index 565ad036cea0..cf80074e63ee 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java @@ -94,7 +94,7 @@ public long scanAndAggregate( ) { long processedRows = 0; - while (!cursor.isDone()) { + while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) { if (hasNulls && selector.isNull()) { if (nullValueAggregates == null) { nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java index 9eca369fdc7e..47333d2ed321 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java @@ -150,7 +150,7 @@ private long scanAndAggregateWithCardinalityKnown( ) { long processedRows = 0; - while (!cursor.isDone()) { + while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) { final IndexedInts dimValues = selector.getRow(); for (int i = 0, size = dimValues.size(); i < size; ++i) { final int dimIndex = dimValues.get(i); @@ -192,7 +192,7 @@ private long scanAndAggregateWithCardinalityUnknown( ) { long processedRows = 0; - while (!cursor.isDone()) { + while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) { final IndexedInts dimValues = selector.getRow(); for (int i = 0, size = dimValues.size(); i < size; ++i) { final int dimIndex = dimValues.get(i); diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java index 285ccf31a60e..e3cf0fa51d28 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java @@ -60,6 +60,7 @@ import org.apache.druid.query.aggregation.FloatMaxAggregatorFactory; import org.apache.druid.query.aggregation.FloatMinAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory; import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.first.DoubleFirstAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.first.FloatFirstAggregatorFactory; @@ -6378,6 +6379,224 @@ public void testTopNAggregateTopnMetricFirstWithGranularity() assertExpectedResults(expectedResults, query); } + + @Test + public void testTopN_time_granularity_empty_buckets() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators(QueryRunnerTestHelper.INDEX_LONG_SUM) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "index", 2836L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "index", 2681L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "index", 1102L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "index", 2514L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "index", 2193L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "index", 1120L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + + @Test + public void testTopN_time_granularity_uses_heap_if_too_big() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators( + QueryRunnerTestHelper.INDEX_LONG_SUM, + new StringAnyAggregatorFactory("big", QueryRunnerTestHelper.PLACEMENT_DIMENSION, 10000000, null) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "big", "preferred", + "index", 2836L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "big", "preferred", + "index", 2681L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "big", "preferred", + "index", 1102L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "big", "preferred", + "index", 2514L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "big", "preferred", + "index", 2193L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "big", "preferred", + "index", 1120L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + private void assumeTimeOrdered() { try (final CursorHolder cursorHolder =