Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport] Add CursorHolder.isPreAggregated method to allow cursors on pre-aggregated data (#17058) #17205

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -454,4 +454,13 @@ public static boolean shouldUseObjectColumnAggregatorWrapper(
}
return false;
}

public static List<AggregatorFactory> getCombiningAggregators(List<AggregatorFactory> aggs)
{
List<AggregatorFactory> combining = new ArrayList<>(aggs.size());
for (AggregatorFactory agg : aggs) {
combining.add(agg.getCombiningFactory());
}
return combining;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.ResultMergeQueryRunner;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
Expand Down Expand Up @@ -508,6 +509,9 @@ public Sequence<ResultRow> process(
final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, groupByQueryMetrics);
final CursorHolder cursorHolder = closer.register(cursorFactory.makeCursorHolder(buildSpec));

if (cursorHolder.isPreAggregated()) {
query = query.withAggregatorSpecs(AggregatorUtil.getCombiningAggregators(query.getAggregatorSpecs()));
}
final ColumnInspector inspector = query.getVirtualColumns().wrapInspector(cursorFactory);

// group by specific vectorization check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ public TimeseriesQuery withDimFilter(DimFilter dimFilter)
return Druids.TimeseriesQueryBuilder.copy(this).filters(dimFilter).build();
}

public TimeseriesQuery withAggregatorSpecs(List<AggregatorFactory> aggregatorSpecs)
{
return Druids.TimeseriesQueryBuilder.copy(this).aggregators(aggregatorSpecs).build();
}

public TimeseriesQuery withPostAggregatorSpecs(final List<PostAggregator> postAggregatorSpecs)
{
return Druids.TimeseriesQueryBuilder.copy(this).postAggregators(postAggregatorSpecs).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.vector.VectorCursorGranularizer;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
Expand Down Expand Up @@ -86,7 +87,7 @@ public TimeseriesQueryEngine(
* scoped down to a single interval before calling this method.
*/
public Sequence<Result<TimeseriesResultValue>> process(
final TimeseriesQuery query,
TimeseriesQuery query,
final CursorFactory cursorFactory,
@Nullable TimeBoundaryInspector timeBoundaryInspector,
@Nullable final TimeseriesQueryMetrics timeseriesQueryMetrics
Expand All @@ -102,6 +103,9 @@ public Sequence<Result<TimeseriesResultValue>> process(
final Granularity gran = query.getGranularity();

final CursorHolder cursorHolder = cursorFactory.makeCursorHolder(makeCursorBuildSpec(query, timeseriesQueryMetrics));
if (cursorHolder.isPreAggregated()) {
query = query.withAggregatorSpecs(AggregatorUtil.getCombiningAggregators(query.getAggregatorSpecs()));
}
try {
final Sequence<Result<TimeseriesResultValue>> result;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessor;
import org.apache.druid.query.topn.types.TopNColumnAggregatesProcessorFactory;
Expand Down Expand Up @@ -73,7 +74,7 @@ public TopNQueryEngine(NonBlockingPool<ByteBuffer> bufferPool)
* update {@link TopNResultValue}
*/
public Sequence<Result<TopNResultValue>> query(
final TopNQuery query,
TopNQuery query,
final Segment segment,
@Nullable final TopNQueryMetrics queryMetrics
)
Expand All @@ -87,6 +88,9 @@ public Sequence<Result<TopNResultValue>> query(

final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, queryMetrics);
final CursorHolder cursorHolder = cursorFactory.makeCursorHolder(buildSpec);
if (cursorHolder.isPreAggregated()) {
query = query.withAggregatorSpecs(AggregatorUtil.getCombiningAggregators(query.getAggregatorSpecs()));
}
final Cursor cursor = cursorHolder.asCursor();
if (cursor == null) {
return Sequences.withBaggage(Sequences.empty(), cursorHolder);
Expand Down Expand Up @@ -127,7 +131,6 @@ public Sequence<Result<TopNResultValue>> query(
return Sequences.withBaggage(Sequences.empty(), cursorHolder);
}


if (queryMetrics != null) {
queryMetrics.cursor(cursor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.Order;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.vector.VectorCursor;

Expand Down Expand Up @@ -58,6 +59,22 @@ default boolean canVectorize()
return false;
}

/**
* Returns true if the {@link Cursor} or {@link VectorCursor} contains pre-aggregated columns for all
* {@link AggregatorFactory} specified in {@link CursorBuildSpec#getAggregators()}.
* <p>
* If this method returns true, {@link ColumnSelectorFactory} and
* {@link org.apache.druid.segment.vector.VectorColumnSelectorFactory} created from {@link Cursor} and
* {@link VectorCursor} respectively will provide selectors for {@link AggregatorFactory#getName()}, and engines
* should rewrite the query using {@link AggregatorFactory#getCombiningFactory()}, since the values returned from
* these selectors will be of type {@link AggregatorFactory#getIntermediateType()}, so the cursor becomes a "fold"
* operation rather than a "build" operation.
*/
default boolean isPreAggregated()
{
return false;
}

/**
* Returns cursor ordering, which may or may not match {@link CursorBuildSpec#getPreferredOrdering()}. If returns
* an empty list then the cursor has no defined ordering.
Expand Down
Loading
Loading