Skip to content

Commit

Permalink
add CursorHolder.isPreAggregated method to allow cursors on pre-aggre…
Browse files Browse the repository at this point in the history
…gated data

changes:
* CursorHolder.isPreAggregated method indicates that a cursor has pre-aggregated data for all AggregatorFactory specified in a CursorBuildSpec. If true, engines should rewrite the query to use AggregatorFactory.getCombiningAggreggator, and column selector factories will provide selectors with the aggregator interediate type for the aggregator factory name
* Added groupby, timeseries, and topN support for CursorHolder.isPreAggregated
* Added synthetic test since no CursorHolder implementations support isPreAggregated at this point in time
  • Loading branch information
clintropolis committed Sep 13, 2024
1 parent a8c06e9 commit fbf72e2
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -454,4 +454,13 @@ public static boolean shouldUseObjectColumnAggregatorWrapper(
}
return false;
}

public static List<AggregatorFactory> getCombiningAggregators(List<AggregatorFactory> aggs)
{
List<AggregatorFactory> combining = new ArrayList<>(aggs.size());
for (AggregatorFactory agg : aggs) {
combining.add(agg.getCombiningFactory());
}
return combining;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.ResultMergeQueryRunner;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
Expand Down Expand Up @@ -508,6 +509,9 @@ public Sequence<ResultRow> process(
final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, groupByQueryMetrics);
final CursorHolder cursorHolder = closer.register(cursorFactory.makeCursorHolder(buildSpec));

if (cursorHolder.isPreAggregated()) {
query = query.withAggregatorSpecs(AggregatorUtil.getCombiningAggregators(query.getAggregatorSpecs()));
}
final ColumnInspector inspector = query.getVirtualColumns().wrapInspector(cursorFactory);

// group by specific vectorization check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ public TimeseriesQuery withDimFilter(DimFilter dimFilter)
return Druids.TimeseriesQueryBuilder.copy(this).filters(dimFilter).build();
}

public TimeseriesQuery withAggregatorSpecs(List<AggregatorFactory> aggregatorSpecs)
{
return Druids.TimeseriesQueryBuilder.copy(this).aggregators(aggregatorSpecs).build();
}

public TimeseriesQuery withPostAggregatorSpecs(final List<PostAggregator> postAggregatorSpecs)
{
return Druids.TimeseriesQueryBuilder.copy(this).postAggregators(postAggregatorSpecs).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.vector.VectorCursorGranularizer;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
Expand Down Expand Up @@ -86,7 +87,7 @@ public TimeseriesQueryEngine(
* scoped down to a single interval before calling this method.
*/
public Sequence<Result<TimeseriesResultValue>> process(
final TimeseriesQuery query,
TimeseriesQuery query,
final CursorFactory cursorFactory,
@Nullable TimeBoundaryInspector timeBoundaryInspector,
@Nullable final TimeseriesQueryMetrics timeseriesQueryMetrics
Expand All @@ -102,6 +103,9 @@ public Sequence<Result<TimeseriesResultValue>> process(
final Granularity gran = query.getGranularity();

final CursorHolder cursorHolder = cursorFactory.makeCursorHolder(makeCursorBuildSpec(query, timeseriesQueryMetrics));
if (cursorHolder.isPreAggregated()) {
query = query.withAggregatorSpecs(AggregatorUtil.getCombiningAggregators(query.getAggregatorSpecs()));
}
try {
final Sequence<Result<TimeseriesResultValue>> result;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessorFactory;
Expand Down Expand Up @@ -73,7 +74,7 @@ public TopNQueryEngine(NonBlockingPool<ByteBuffer> bufferPool)
* update {@link TopNResultValue}
*/
public Sequence<Result<TopNResultValue>> query(
final TopNQuery query,
TopNQuery query,
final Segment segment,
@Nullable final TopNQueryMetrics queryMetrics
)
Expand All @@ -87,6 +88,9 @@ public Sequence<Result<TopNResultValue>> query(

final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, queryMetrics);
final CursorHolder cursorHolder = cursorFactory.makeCursorHolder(buildSpec);
if (cursorHolder.isPreAggregated()) {
query = query.withAggregatorSpecs(AggregatorUtil.getCombiningAggregators(query.getAggregatorSpecs()));
}
final Cursor cursor = cursorHolder.asCursor();
if (cursor == null) {
return Sequences.withBaggage(Sequences.empty(), cursorHolder);
Expand Down Expand Up @@ -127,7 +131,6 @@ public Sequence<Result<TopNResultValue>> query(
return Sequences.withBaggage(Sequences.empty(), cursorHolder);
}


if (queryMetrics != null) {
queryMetrics.cursor(cursor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.Order;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.vector.VectorCursor;

Expand Down Expand Up @@ -58,6 +59,22 @@ default boolean canVectorize()
return false;
}

/**
* Returns true if the {@link Cursor} or {@link VectorCursor} contains preaggregated columns for all
* {@link AggregatorFactory} specified in {@link CursorBuildSpec#getAggregators()}.
* <p>
* If this method returns true, {@link ColumnSelectorFactory} and
* {@link org.apache.druid.segment.vector.VectorColumnSelectorFactory} created from {@link Cursor} and
* {@link VectorCursor} respectively will provide selectors for {@link AggregatorFactory#getName()}, and engines
* should rewrite the query using {@link AggregatorFactory#getCombiningFactory()}, since the values returned from
* these selectors will be of type {@link AggregatorFactory#getIntermediateType()}, so the cursor becomes a "fold"
* operation rather than a "build" operation.
*/
default boolean isPreAggregated()
{
return false;
}

/**
* Returns cursor ordering, which may or may not match {@link CursorBuildSpec#getPreferredOrdering()}. If returns
* an empty list then the cursor has no defined ordering.
Expand Down
Loading

0 comments on commit fbf72e2

Please sign in to comment.