Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedup input data before buffering in distinct aggregates #16795

Merged
merged 1 commit into from
Oct 12, 2021

Conversation

pgupta2
Copy link
Contributor

@pgupta2 pgupta2 commented Sep 26, 2021

SpillableFinalOnlyGroupedAccumulator buffers input pages in rawInputs and when
spill is triggered, these pages are spilled to disk. Currently, entire page is
buffered and spilled. For distinct aggregates, we can dedup rows within the same
groupId before buffering. This reduces memory used for buffering and also amount of
data to be spilled, if spill is triggered.

Test plan - Unit Tests, Verifier runs.

== RELEASE NOTES ==

General Changes
* Add a new configuration property ``experimental.dedup-based-distinct-aggregation-spill-enabled`` to enable deduplication of input data before spilling for distinct aggregates. This can be overridden by ``dedup_based_distinct_aggregation_spill_enabled`` session property.

@pgupta2 pgupta2 marked this pull request as draft September 26, 2021 00:41
@pgupta2 pgupta2 force-pushed the dedup_distinct_agg_inputs branch 3 times, most recently from c45ee65 to f033d0e Compare September 28, 2021 05:17
@pgupta2 pgupta2 changed the title Deduplicate input pages in SpillableFinalOnlyGroupedAccumulator for distinct aggregates Dedup input data before buffering in distinct aggregates Sep 28, 2021
@pgupta2 pgupta2 marked this pull request as ready for review September 28, 2021 06:30
@pgupta2 pgupta2 force-pushed the dedup_distinct_agg_inputs branch 2 times, most recently from 6a9a5fe to c4ed525 Compare September 28, 2021 19:21
Copy link
Contributor

@aweisberg aweisberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First quick pass. This doesn't add a test that checks that this both deduplicates, round trips the data correctly, and maybe something to show it accounts for memory correctly. Unless this relies on an existing unit test. Not sure how testable memory footprint is though.

I need to better understand what MarkDistinctHash does (and whether it resolves collisions) and when evaluateIntermediate is called

groupIdsBlock = new GroupByIdBlock(groupIdsBlock.getGroupCount(), dedupPage.getBlock(0));

// Remove the groupId block from distinct Page
page = dedupPage.extractChannels(IntStream.range(1, dedupPage.getChannelCount()).toArray());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now my question is which step here actually removes the non-distinct rows from memory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter(withGroup, distinctMask); step performs the removal of duplicate rows based on distinctMask that we have computed. Basically, MarkDistinctHash computes hash on (groupId + aggregate Input channels) and uses this hash to figure out if this row is distinct or not. This information is then used to filter out non-distinct rows from the input page.

Copy link
Contributor

@aweisberg aweisberg Sep 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I wasn't sure if getPositions on Block actually always creates a copy or a view of the block. Looking at the implementation in say RunLengthEncodedBlock it makes no sense to me because it discards the positions and just returns another wrapper. DictionaryBlock also seems to play similar games.

The default implementation creates a DictionaryBlock which I think is just a wrapper around the source block?

Copy link
Contributor

@aweisberg aweisberg Sep 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the key difference is that the filtered stuff is normally fed to the underlying aggregation. So the reduction happens there and most of this is concerned with identifying what to feed to it. But here I think we just store the page away because we won't feed it to the aggregation until much later.

For distinct I guess we have to store all the values so we don't need to provide it to the underlying aggregation, but we do want to discard the non-distinct values from memory.

Depending on how MarkDistinctHash works is it going to store these values twice?

If there are multiple distinct aggregations on the same column with no filter or the same filter then we would also like to share the memory used to track distinct values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on how MarkDistinctHash works is it going to store these values twice?

MarkDistinctHash will store each distinct row only once. It internally uses GroupByHash which has all the logic to detect duplicates. When duplicate row comes in, it computes the hash for this duplicate row and if this hash has been seen before, then it performs an actual check of values if they are same or not. If equality check passes, then it means that it is actually a duplicate row. If equality check does not passes, then it means that there was a hash collision and it is handled accordingly. So, basically, each distinct row is stored only once.

If there are multiple distinct aggregations on the same column with no filter or the same filter then we would also like to share the memory used to track distinct values.

Yes, that would be good to have. But, I am not targeting this in this PR. Currently, each aggregate has its own MarkDistinctHash that is used to perform deduplication. Maybe, this can be done as an optimization in followup PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably put out too much noise so the signal got lost here.

filter doesn't actually filter. It returns a Page that appears to be a view of the original Page's blocks. This is retaining all non-distinct values for the distincted columns. Please correct me if I am wrong, but all the getPositions implementations I looked at create views of the original Block.

MarkDistinctHash copies the column values being distincted so we are storing the values twice. Once in MarkDistinctHash and once in the pages that we fed to mark distinct hash, but also retained. We don't need to do that. We could only retain MarkDistinctHash and then extract the values stored in it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter doesn't actually filter. It returns a Page that appears to be a view of the original Page's blocks. This is retaining all non-distinct values for the distincted columns. Please correct me if I am wrong, but all the getPositions implementations I looked at create views of the original Block.

This is correct. filter calls getPositions which creates a view over the existing block. But, when we prepare pages for spill, we only spill filtered values and during unspill, create a page with only distinct rows. We can actually call copyPositions instead which performs the actual copy and returns a new page which only contains distinct value. This will incur some cpu cost due to copy. How expensive is copy operation? If we go with copy approach, we will save on memory buffer as only distinct values will be kept and buffered.

MarkDistinctHash copies the column values being distincted so we are storing the values twice. Once in MarkDistinctHash and once in the pages that we fed to mark distinct hash, but also retained. We don't need to do that. We could only retain MarkDistinctHash and then extract the values stored in it.

This is actually a good observation. We can actually get values directly from MarkDistinctHash but MarkDistinctHash only stores the value for hash channels. It does not store values for maskChannel which is also required in DistinctingGroupedAccumulator in some cases. Its possible to modify markDistinctHash to start storing maskChannel values as well. Let me give it a deeper thought tomorrow.

Copy link
Contributor

@aweisberg aweisberg Oct 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue here is that turning on spilling with this implementation causes queries to use more memory thus forcing them to spill and possibly run out of spill space.

To roll out spilling for distinct aggregation for all queries we need to be at least as memory efficient as the non-spilling version. There is no justification for a regression on the wider workload it really will cause problems.

To target specific queries where it helps and doesn't hurt then sure we can use this approach, but it is pretty wrong to store non-distinct values in memory. That isn't the right behavior for aggregation.

Regarding the performance impact of copying. I think that is a non-issue. If there are few distinct values retained then the performance advantage from copying and not spilling will be insurmountable. If there are many unique values we first of all don't have to do the copy (it can be conditional), but we will run out of memory so quickly that it won't be the dominant performance factor as spilling will dominate.

Copy link
Contributor

@aweisberg aweisberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I have to say on some level it is impressive how little code this ended up being.

Optional.empty(),
joinCompiler,
() -> {
updateMemory.update();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preserve the comments here? Or maybe don't copy paste it and make it a shared lamba in all 3 instances in the file?

groupIdsBlock = new GroupByIdBlock(groupIdsBlock.getGroupCount(), dedupPage.getBlock(0));

// Remove the groupId block from distinct Page
page = dedupPage.extractChannels(IntStream.range(1, dedupPage.getChannelCount()).toArray());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now my question is which step here actually removes the non-distinct rows from memory?

@@ -689,6 +716,27 @@ public void addInput(GroupByIdBlock groupIdsBlock, Page page)
checkState(rawInputs != null && blockBuilders == null);
rawInputs.ensureCapacity(rawInputsLength);

// Perform dedup for distinct aggregates
if (delegate instanceof DistinctingGroupedAccumulator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't read the logic yet. A quick design nit:

  • instead of doing type check, we can introduce a method for FinalOnlyGroupedAccumulator interface called Page addInputPreprocessing(GroupByIdBlock groupIdsBlock, Page page) with different implementations for DistinctingGroupedAccumulator and OrderingGroupedAccumulator (which is a no op)
  • markDistinctHash is better not visible at SpillableFinalOnlyGroupedAccumulator but inside DistinctingGroupedAccumulator.
  • Would be good to clarify the markDistinctHash already in DistinctingGroupedAccumulator and the new one you just introduced. Haven't read the code yet. Are they the same?

Copy link
Contributor Author

@pgupta2 pgupta2 Sep 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re 1: Sure. I was initially thinking in that direction since I expected too many code changes but since the changes were not much, I went with ahead with in-lining the code within the IF check. Let me do what you are suggesting.

Re 2: SpillableFinalOnlyGroupedAccumulator buffers the input and spills them to disk, if needed, until all inputs are received by the HashAggregationOperator. We need to perform deduplication before pages are buffered/spilled so that we can save on memory/spilled bytes. We cannot reuse the markDistinctHash from delegate directly for performing this dedup since there is no way to reset it back. Hence, we are creating a separate markDistinctHash object for SpillableFinalOnlyGroupedAccumulator for performing partial dedup.

If we want to use the hash from delegate, then we would have store all the info that is needed to create MarkDistinctHash object in DistinctingGroupedAccumulator and add a reset() method in FinalOnlyGroupedAccumulator which would basically reset the hash object just before we start pushing pages to delegate.

Re 3: They are NOT the same object. They are created with same args and does the same thing but markDistinctHash inside SpillableFinalOnlyGroupedAccumulator perform partial dedup (between spill cycles) while markDistinctHash inside delegate performs the final dedup.

@pgupta2
Copy link
Contributor Author

pgupta2 commented Sep 29, 2021

First quick pass. This doesn't add a test that checks that this both deduplicates, round trips the data correctly, and maybe something to show it accounts for memory correctly. Unless this relies on an existing unit test. Not sure how testable memory footprint is though.

TestSpilledAggregations has multiple test cases that executes the spill path for distinct aggregates. The changes are in common code path and it will be executed for all distinct aggregate test cases by default. I am not sure how we can add a unit test that can check for partial states inside HashAggregationOperator. Any pointers will be very helpful. Same with memory footprint. How can I add a unit test for doing memory checks?

@pgupta2 pgupta2 force-pushed the dedup_distinct_agg_inputs branch from c4ed525 to 99dee7c Compare September 30, 2021 06:56
@highker highker self-requested a review September 30, 2021 07:15
Copy link
Contributor

@highker highker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The structure looks great! Will take a look on logic details tomorrow.

Comment on lines 46 to 57
abstract GroupIdPage addInputPreprocessing(GroupIdPage groupIdPage);

abstract void reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add javadocs to these two methods?

@@ -757,6 +819,10 @@ public void evaluateIntermediate(int groupId, BlockBuilder output)
if (blockBuilders == null) {
checkState(rawInputs != null);

// Release memory held by markDistinctHash in delegate.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's avoid using the implementation detail comments like "markDistinctHash" in the interface invocation

@highker highker self-requested a review September 30, 2021 07:44
@pgupta2 pgupta2 force-pushed the dedup_distinct_agg_inputs branch from 99dee7c to f15e890 Compare October 1, 2021 06:21
@pgupta2
Copy link
Contributor Author

pgupta2 commented Oct 1, 2021

@highker : I discovered a bug during verifier run and it seems that doing partial dedup before applying filter mask can return incorrect results in some cases. This is because dedup results depends on the order of input rows and markDistinctHash does not consider maskChannel while computing hash.

For example, consider the following data

Custkey          OrderPriority     OrderKey
   2                  HIGH           90
   2                  HIGH           200
   2                  LOW            50

Query: SELECT custkey, count(DISTINCT orderpriority) FILTER(WHERE orderkey > 100) FROM orders GROUP BY custkey

In addInputPreprocessing, we first perform dedup using markDistinctHash which computes hashes on groupId + input channels. Since, custKey is group by key and orderpriority is input channel, hash will be computed on these two columns. When we will perform dedup in addInputPreprocessing(), the second row from the input will be removed and first and third row will be pushed to delegate. Delegate (DistinctingGroupedAccumulator) will apply filter based on maskChannel (orderKey) and thus will filter out both the rows, thereby producing incorrect result.

There are two ways to fix this:

  1. We extend markDistinctHash to consider maskChannel as well while computing hash
  2. We apply maskChannel filter first before doing dedup.

Option 2 is better since it eliminate rows that will be eventually filtered out inside delegate in preprocessing itself, thereby reducing memory usage and spilled bytes.
I am updating this PR with the fix. Now, in addInputPreprocessing, we first apply filter based on maskChannel and then we perform dedup. I have also added a unit test that checks this specific codepath.

@highker
Copy link
Contributor

highker commented Oct 1, 2021

@pgupta2, thanks for the fix. I haven't got a chance to look into logic details yet. Will pay attention to it while reviewing. On the other side, shall we introduce a config + session property to guard this new behavior? It would be useful to prevent production correctness issues.

GroupByIdBlock distinctGroupIdsBlock = new GroupByIdBlock(groupIdsBlock.getGroupCount(), dedupPage.getBlock(0));

// Remove the groupId block from distinct Page
Page distinctPage = dedupPage.extractChannels(IntStream.range(1, dedupPage.getChannelCount()).toArray());
Copy link
Contributor

@pettyjamesm pettyjamesm Oct 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating IntStream.range(1, dedupPage.getChannelCount()).toArray() for each input page seems unnecessarily expensive, I suggesting storing that array as a field if the size is knowable in advance, or to add a method to Page like Page#dropColumn(int) to avoid the per call overhead.

@pgupta2 pgupta2 force-pushed the dedup_distinct_agg_inputs branch 2 times, most recently from 0967c72 to d559865 Compare October 3, 2021 23:25
@@ -147,6 +148,17 @@ public int getPositionCount()
return nullCheckBlock.getPositionCount();
}

public int getNotNullPositionCount()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/NotNull/NonNull

@@ -16,6 +16,7 @@
import org.openjdk.jol.info.ClassLayout;

import static io.airlift.slice.SizeOf.sizeOf;
import static java.util.Objects.isNull;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used

@@ -181,7 +182,7 @@ public GroupedAccumulator createGroupedAccumulator(UpdateMemory updateMemory)
ImmutableList.Builder<Integer> aggregateInputChannels = ImmutableList.builder();
aggregateInputChannels.addAll(inputChannels);
maskChannel.ifPresent(aggregateInputChannels::add);
return new SpillableFinalOnlyGroupedAccumulator(sourceTypes, aggregateInputChannels.build(), (FinalOnlyGroupedAccumulator) accumulator);
return new SpillableFinalOnlyGroupedAccumulator(sourceTypes, aggregateInputChannels.build(), (FinalOnlyGroupedAccumulator) accumulator, isSpillableAccumulatorPreprocessingEnabled(session));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's checkArg session is not null. It's interesting that session could be null when distinct is false by reading the context.

@@ -129,6 +129,7 @@
private boolean orderByAggregationSpillEnabled = true;
private boolean windowSpillEnabled = true;
private boolean orderBySpillEnabled = true;
private boolean spillableAccumulatorPreprocessingEnabled = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we make it to false? It is safer to rollout in prod

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep this enabled in code so that it can go though out strict release verification process. That way, we would be able to detect any regression/issues during release verification itself. If we disable it in code and the enable it via config in prod, then we will miss the exhaustive testing coverage that we would have gotten otherwise.

I am also going to perform exhaustive verifier runs to test this PR but still any additional testing is cherry on top. Let me know what you think and if you have strong opinion for disabling it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, don't trust our release process. We don't validate spilling during our release ....

Copy link
Contributor Author

@pgupta2 pgupta2 Oct 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha.. Ok. I will disable it then..

Comment on lines 726 to 780
int newPositionCount = columnarRow.getPositionCount(); // number of positions in expanded array (since columnarRow is already flattened)
int newPositionCount = columnarRow.getNotNullPositionCount(); // number of positions in expanded array (since columnarRow is already flattened)
long[] newGroupIds = new long[newPositionCount];
boolean[] nulls = new boolean[newPositionCount];
int currentRowBlockIndex = 0;
for (int groupIdPosition = 0; groupIdPosition < groupIdsBlock.getPositionCount(); groupIdPosition++) {
for (int unused = 0; unused < arrayBlock.getBlock(groupIdPosition).getPositionCount(); unused++) {
// unused because we are expanding all the squashed values for the same group id
if (arrayBlock.getBlock(groupIdPosition).isNull(unused)) {
break;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite get this newly changed logic. Could we briefly explain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While pre-processing, we first perform a filter on maskChannel. This filter can cause some groupIds to get filtered out completely i.e.. we might end up no rows for the given groupId. In evaluateIntermediate(), we get a groupId as an input and we basically store all the rows for the given groupId in its corresponding output block. Since, all rows for the given groupId has been filtered, we basically store a null in the output BlockBuilder. This is being done here:

if (blockBuilders.get(groupId) == null) {
       singleArrayBlockWriter.appendNull();
}

Since, we are storing null for certain groupIds, inside addIntermediate, we can expect output block for a groupId to be null. This logic checks for that case and skip the groupIds for which there are no rows.

Block distinctMask = work.getResult();

// Filter out duplicate rows and return the page with distinct rows
Page dedupPage = filter(filtered, distinctMask);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely we could do a compact when filtering ratio is high.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think even if it compacted every time it would still be a 2x memory regression because it is storing the distinct values twice compared to the non-spill case where the values are stored once in MarkDistinctHash and 0-1x inside the wrapped accumulator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am currently working on adding support for using MarkDistinctHash as the input source when pre-processing is enabled. I have the changes ready and unit tests are passing but I want to do some verifier runs to ensure that changes are working correctly. With these changes, we will no longer buffer pages in rawInputs when pre-processing is enabled. Instead, we will get them from markDistinctHash right before spilling to disk or pushing to underlying delegate.

With these changes, I am going to restructure the code in the following way:

  1. SpillableFinalOnlyGroupedAccumulator will be used when pre-processing is disabled or when delegate is OrderingGroupedAccumulator. Some changes are done in this class to support pre-processed data but these changes are backward compatible with non-processed inputs as well.
  2. We will add a new class called SpillableDistinctPreprocessingFinalOnlyGroupedAccumulator which extends SpillableFinalOnlyGroupedAccumulator and is used only for distinct aggregates when pre-processing is enabled. This will allow us to write distinct specific logic without adding too many methods in FinalOnlyGroupedAccumulator interface.
  3. We will remove the addInputPreprocessing() and reset() API from FinalOnlyGroupedAccumulator since these applies only to distinct aggregates and could be called directly inside SpillableDistinctPreprocessingFinalOnlyGroupedAccumulator
  4. SpillableDistinctPreprocessingFinalOnlyGroupedAccumulator will override necessary methods like addInput, evaluateIntermediate, prepareFinal and will perform logic that is specific to distinct aggregates with preprocessing enabled.

With these changes, we will be able to cleanly separate out logic when pre-processing is enabled vs disabled. I will update the PR once I have done some verifier testing but would be eager to get feedback on the new design.

@@ -488,6 +504,46 @@ public void evaluateFinal(int groupId, BlockBuilder output)
public void prepareFinal()
{
}

@Override
public GroupIdPage addInputPreprocessing(GroupIdPage groupIdPage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method has a lot of duplication with addInput. Can we improve on that?

@pgupta2 pgupta2 force-pushed the dedup_distinct_agg_inputs branch from c892fe3 to d0c93a5 Compare October 6, 2021 18:11
@pgupta2
Copy link
Contributor Author

pgupta2 commented Oct 6, 2021

@highker @aweisberg @pettyjamesm : Updated the PR to use markDistinctHash as input source for Dedup pages. This will allows us to skip buffering in rawInputs when pre-processing is enabled. This should solve two problems for us:

  1. It removes 2x memory usage during pre-processing (Pages were buffered in rawInput and then distinct rows are copied inside markDistinctHash). Now, we don't buffer pages in rawInputs.
  2. MarkDistinctHash stores only distinct rows internally. When we get distinct pages from markDistinctHash, we get back distinct compacted pages. Previously, the dedup logic would create a view (DictionaryBlock) over the existing page which would not reduce buffering memory usage. With new approach, we are going to actually buffer only dedup rows. This should reduce memory usage.

@pgupta2 pgupta2 force-pushed the dedup_distinct_agg_inputs branch from d0c93a5 to 8354a8a Compare October 6, 2021 21:34
Copy link
Contributor

@highker highker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given it's a big structure change from the previous design, can we squash two commits into one?

return wrapBlocksWithoutCopy(positionCount, result);
}

public Page insertColumnAtIndex(int channelIndex, Block column)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just call this insertColumn. Also, it would be good to refactor prependColumn to leverage this new interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a bug while testing and after fixing it, this method is no longer needed. This will be removed in the next update.

@Override
public List<Page> getBufferedPages()
{
throw new UnsupportedOperationException("BigIntGroupByHash does not support getBufferedPages");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BigintGroupByHash is widely used. Do we wanna implement this? Also, would be good to have a test case to cover this path. This can be triggered when the grouping key is a single long type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For DISTINCT aggregate, we always use MultiChannelGroupByHash since hash is computed on the groupId and the distinct column channel. So, there will be minimum 2 channels present for DISTINCT aggregates.
Since, preprocessing logic (and using MarkDistinctHash as input source) is only done for Distinct Aggregates, I added the implementation only for MultiChannelGroupByHash.

For BigintGroupByHash, adding this implementation does not make sense since there is no logic that calls into it. This is the reason I explicitly throw the exception so that if anyone tries to use it in future, they will have to add an implementation for it first. I also feel that there is no use-case to use BigintGroupByHash as page input source currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Maybe add a comment to this.

@Override
public List<Page> getBufferedPages()
{
ImmutableList.Builder<Page> inputPageList = new ImmutableList.Builder<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • s/inputPageList/inputPages
  • s/new ImmutableList.Builder<>()/ImmutableList.builder();

@Override
public List<Page> getBufferedPages()
{
throw new UnsupportedOperationException("BigIntGroupByHash does not support getBufferedPages");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return ImmutableList.of(). This is also a legit case; so we wanna return something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reasoning as above. There is no code that calls this method for this class and that why I have not implemented it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same; add a comment; also there is a typo "BigIntGroupByHash"


import static java.util.Objects.requireNonNull;

public class GroupIdPage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we are not going to use this outside SpillableFinalOnlyGroupedAccumulator, shall we keep as it was?

@@ -181,7 +184,14 @@ public GroupedAccumulator createGroupedAccumulator(UpdateMemory updateMemory)
ImmutableList.Builder<Integer> aggregateInputChannels = ImmutableList.builder();
aggregateInputChannels.addAll(inputChannels);
maskChannel.ifPresent(aggregateInputChannels::add);
return new SpillableFinalOnlyGroupedAccumulator(sourceTypes, aggregateInputChannels.build(), (FinalOnlyGroupedAccumulator) accumulator);

checkArgument(session != null, "Session is null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use checkState. checkArgument is used for the input parameter check; while checkState is to assert there is nothing wrong with the current state of the class.

return new SpillableFinalOnlyGroupedAccumulator(sourceTypes, aggregateInputChannels.build(), (FinalOnlyGroupedAccumulator) accumulator);

checkArgument(session != null, "Session is null");
if (isSpillableAccumulatorPreprocessingEnabled(session) && hasDistinct() && !hasOrderBy()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmmm..... what if you have order by and distinct? Isn't that going to be a corner case we can't handle well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have order by and distinct, we will have SpillableFinalOnlyGroupedAccumulator whose child/delegate will be OrderingGroupedAccumulator and its child will be DistinctingGroupedAccumulator. Since, SpillableFinalOnlyGroupedAccumulator is feeding data to OrderingGroupedAccumulator, we cannot perform any pre-processing since all the data needs to flow to OrderingGroupedAccumulator for it to work correctly.

Thats why we use SpillableFinalOnlyGroupedAccumulator in this case.

if (isSpillableAccumulatorPreprocessingEnabled(session) && hasDistinct() && !hasOrderBy()) {
return new SpillableDistinctPreprocessingFinalOnlyGroupedAccumulator(sourceTypes, aggregateInputChannels.build(), (DistinctingGroupedAccumulator) accumulator, maskChannel);
}
else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move else, it's redundant; just directly return

Comment on lines 948 to 950
inputPages = inputPages.stream().map(
page -> page.insertColumnAtIndex(maskChannel, RunLengthEncodedBlock.create(BOOLEAN, true, page.getPositionCount()))
).collect(toImmutableList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

inputPages = inputPages.stream()
        .map(page -> page.insertColumnAtIndex(maskChannel, RunLengthEncodedBlock.create(BOOLEAN, true, page.getPositionCount())))
        .collect(toImmutableList());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be removed in the new update of this diff

@pgupta2 pgupta2 force-pushed the dedup_distinct_agg_inputs branch from 8354a8a to c19a46a Compare October 9, 2021 07:19
@pgupta2
Copy link
Contributor Author

pgupta2 commented Oct 9, 2021

Given it's a big structure change from the previous design, can we squash two commits into one?

I am planning to squash the two commits together once the PR is approved right before merging. Review wise, multiple commits should not cause any difference. Right? Is there any advantage of merging these commits before?

@pgupta2 pgupta2 requested a review from highker October 9, 2021 07:24
Copy link
Contributor

@highker highker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we squash all commits into one?

@Override
public List<Page> getBufferedPages()
{
throw new UnsupportedOperationException("BigIntGroupByHash does not support getBufferedPages");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Maybe add a comment to this.

@Override
public List<Page> getBufferedPages()
{
throw new UnsupportedOperationException("BigIntGroupByHash does not support getBufferedPages");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same; add a comment; also there is a typo "BigIntGroupByHash"

@@ -128,6 +128,7 @@
public static final String ORDER_BY_AGGREGATION_SPILL_ENABLED = "order_by_aggregation_spill_enabled";
public static final String WINDOW_SPILL_ENABLED = "window_spill_enabled";
public static final String ORDER_BY_SPILL_ENABLED = "order_by_spill_enabled";
public static final String SPILLABLE_ACCUMULATOR_PREPROCESSING_ENABLED = "spillable_accumulator_preprocessing_enabled";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will no longer be a preprocessing. It could be something like "dedup_based_distinct_aggregation_spill_enabled".

/**
* SpillableAccumulator to perform preprocessing of input data for distinct aggregates only
*/
private static class SpillableDistinctPreprocessingFinalOnlyGroupedAccumulator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: DedupBasedSpillableDistinctGroupedAccumulator

@highker highker self-assigned this Oct 9, 2021
@pgupta2 pgupta2 force-pushed the dedup_distinct_agg_inputs branch from c19a46a to 99445c7 Compare October 10, 2021 07:23
@highker
Copy link
Contributor

highker commented Oct 11, 2021

Test failure is related

SpillableFinalOnlyGroupedAccumulator buffers input pages in `rawInputs` and when
spill is triggered, these pages are spilled to disk. Currently, entire page is
buffered and spilled. For distinct aggregates, we can dedup rows within the same
groupId before buffering. This reduces memory used for buffering and also amount of
data to be spilled, if spill is triggered.
@pgupta2 pgupta2 force-pushed the dedup_distinct_agg_inputs branch from 99445c7 to eb4b28e Compare October 11, 2021 16:56
@pgupta2
Copy link
Contributor Author

pgupta2 commented Oct 11, 2021

Test failure is related

I noticed the failure but didn't updated the PR since there was a bug that I found and I wanted to include the fix as well in the update. The bug triggers when aggregation maskFilter filters out all the input rows. In that case, nothing is sent to delegate's accumulator and it throws a NLP (since its internal states are not initialized). The fix is to send an empty page in such case with the correct groupCount so that delegate's accumulator can init its internal states. The bug is now fixed and I have added a unit test to test this.

@highker
Copy link
Contributor

highker commented Oct 11, 2021

LGTM. @aweisberg, do you have other comments on this PR?

@highker highker merged commit 343b039 into prestodb:master Oct 12, 2021
@prithvip prithvip mentioned this pull request Oct 19, 2021
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants