Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedup input data before buffering in distinct aggregates #16795

Merged
merged 1 commit into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public Page getLoadedPage()

public Page getLoadedPage(int channel)
{
return wrapBlocksWithoutCopy(positionCount, new Block[]{this.blocks[channel].getLoadedBlock()});
return wrapBlocksWithoutCopy(positionCount, new Block[] {this.blocks[channel].getLoadedBlock()});
}

public Page getLoadedPage(int... channels)
Expand Down Expand Up @@ -377,7 +377,7 @@ public Page copyPositions(int[] retainedPositions, int offset, int length)

public Page extractChannel(int channel)
{
return wrapBlocksWithoutCopy(positionCount, new Block[]{this.blocks[channel]});
return wrapBlocksWithoutCopy(positionCount, new Block[] {this.blocks[channel]});
}

public Page extractChannels(int[] channels)
Expand All @@ -404,6 +404,18 @@ public Page prependColumn(Block column)
return wrapBlocksWithoutCopy(positionCount, result);
}

public Page dropColumn(int channelIndex)
{
if (channelIndex < 0 || channelIndex >= getChannelCount()) {
throw new IndexOutOfBoundsException(format("Invalid channel %d in page with %s channels", channelIndex, getChannelCount()));
}

Block[] result = new Block[getChannelCount() - 1];
System.arraycopy(blocks, 0, result, 0, channelIndex);
System.arraycopy(blocks, channelIndex + 1, result, channelIndex, getChannelCount() - channelIndex - 1);
return wrapBlocksWithoutCopy(positionCount, result);
}

private long updateRetainedSize()
{
long retainedSizeInBytes = INSTANCE_SIZE + sizeOf(blocks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,21 @@ public int getPositionCount()
return nullCheckBlock.getPositionCount();
}

public int getNonNullPositionCount()
{
if (!nullCheckBlock.mayHaveNull()) {
return getPositionCount();
}

int count = 0;
pgupta2 marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < getPositionCount(); i++) {
if (!isNull(i)) {
count++;
}
}
return count;
}

public boolean isNull(int position)
{
return nullCheckBlock.isNull(position);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public final class SystemSessionProperties
public static final String JOIN_SPILL_ENABLED = "join_spill_enabled";
public static final String AGGREGATION_SPILL_ENABLED = "aggregation_spill_enabled";
public static final String DISTINCT_AGGREGATION_SPILL_ENABLED = "distinct_aggregation_spill_enabled";
public static final String DEDUP_BASED_DISTINCT_AGGREGATION_SPILL_ENABLED = "dedup_based_distinct_aggregation_spill_enabled";
public static final String ORDER_BY_AGGREGATION_SPILL_ENABLED = "order_by_aggregation_spill_enabled";
public static final String WINDOW_SPILL_ENABLED = "window_spill_enabled";
public static final String ORDER_BY_SPILL_ENABLED = "order_by_spill_enabled";
Expand Down Expand Up @@ -630,6 +631,11 @@ public SystemSessionProperties(
"Enable spill for distinct aggregations if spill_enabled and aggregation_spill_enabled",
featuresConfig.isDistinctAggregationSpillEnabled(),
false),
booleanProperty(
DEDUP_BASED_DISTINCT_AGGREGATION_SPILL_ENABLED,
"Perform deduplication of input data for distinct aggregates before spilling",
featuresConfig.isDedupBasedDistinctAggregationSpillEnabled(),
false),
booleanProperty(
ORDER_BY_AGGREGATION_SPILL_ENABLED,
"Enable spill for order-by aggregations if spill_enabled and aggregation_spill_enabled",
Expand Down Expand Up @@ -1506,6 +1512,11 @@ public static boolean isDistinctAggregationSpillEnabled(Session session)
return session.getSystemProperty(DISTINCT_AGGREGATION_SPILL_ENABLED, Boolean.class) && isAggregationSpillEnabled(session);
}

public static boolean isDedupBasedDistinctAggregationSpillEnabled(Session session)
{
return session.getSystemProperty(DEDUP_BASED_DISTINCT_AGGREGATION_SPILL_ENABLED, Boolean.class);
}

public static boolean isOrderByAggregationSpillEnabled(Session session)
{
return session.getSystemProperty(ORDER_BY_AGGREGATION_SPILL_ENABLED, Boolean.class) && isAggregationSpillEnabled(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ public Work<?> addPage(Page page)
return new AddPageWork(page.getBlock(hashChannel));
}

@Override
public List<Page> getBufferedPages()
{
// This method is left unimplemented since it is not invoked from anywhere within code.
// Add an implementation, if needed in future
throw new UnsupportedOperationException("BigIntGroupByHash does not support getBufferedPages");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BigintGroupByHash is widely used. Do we wanna implement this? Also, would be good to have a test case to cover this path. This can be triggered when the grouping key is a single long type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For DISTINCT aggregate, we always use MultiChannelGroupByHash since hash is computed on the groupId and the distinct column channel. So, there will be minimum 2 channels present for DISTINCT aggregates.
Since, preprocessing logic (and using MarkDistinctHash as input source) is only done for Distinct Aggregates, I added the implementation only for MultiChannelGroupByHash.

For BigintGroupByHash, adding this implementation does not make sense since there is no logic that calls into it. This is the reason I explicitly throw the exception so that if anyone tries to use it in future, they will have to add an implementation for it first. I also feel that there is no use-case to use BigintGroupByHash as page input source currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Maybe add a comment to this.

}

@Override
public Work<GroupByIdBlock> getGroupIds(Page page)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ static GroupByHash createGroupByHash(

Work<?> addPage(Page page);

List<Page> getBufferedPages();

Work<GroupByIdBlock> getGroupIds(Page page);

boolean contains(int position, Page page, int[] hashChannels);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public int getCapacity()
return groupByHash.getCapacity();
}

public List<Page> getDistinctPages()
{
return groupByHash.getBufferedPages();
}

private Block processNextGroupIds(GroupByIdBlock ids)
{
int positions = ids.getPositionCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,25 @@ public Work<?> addPage(Page page)
return new AddNonDictionaryPageWork(page);
}

@Override
public List<Page> getBufferedPages()
{
ImmutableList.Builder<Page> inputPages = ImmutableList.builder();
int numPages = channelBuilders.get(0).size();
for (int i = 0; i < numPages; i++) {
Block[] blocks = new Block[channels.length];
for (int channel = 0; channel < channels.length; channel++) {
blocks[channel] = ((BlockBuilder) channelBuilders.get(channel).get(i)).build();
}

Page page = new Page(blocks);
if (page.getPositionCount() > 0) {
inputPages.add(page);
}
}
return inputPages.build();
}

@Override
public Work<GroupByIdBlock> getGroupIds(Page page)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ public Work<?> addPage(Page page)
return new CompletedWork<>(0);
}

@Override
public List<Page> getBufferedPages()
{
// This method is left unimplemented since it is not invoked from anywhere within code.
// Add an implementation, if needed in future
throw new UnsupportedOperationException("NoChannelGroupByHash does not support getBufferedPages");
}

@Override
public Work<GroupByIdBlock> getGroupIds(Page page)
{
Expand Down
Loading