-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dedup input data before buffering in distinct aggregates #16795
Conversation
c45ee65
to
f033d0e
Compare
6a9a5fe
to
c4ed525
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First quick pass. This doesn't add a test that checks that this both deduplicates, round trips the data correctly, and maybe something to show it accounts for memory correctly. Unless this relies on an existing unit test. Not sure how testable memory footprint is though.
I need to better understand what MarkDistinctHash
does (and whether it resolves collisions) and when evaluateIntermediate
is called
groupIdsBlock = new GroupByIdBlock(groupIdsBlock.getGroupCount(), dedupPage.getBlock(0)); | ||
|
||
// Remove the groupId block from distinct Page | ||
page = dedupPage.extractChannels(IntStream.range(1, dedupPage.getChannelCount()).toArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now my question is which step here actually removes the non-distinct rows from memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filter(withGroup, distinctMask);
step performs the removal of duplicate rows based on distinctMask that we have computed. Basically, MarkDistinctHash
computes hash on (groupId + aggregate Input channels) and uses this hash to figure out if this row is distinct or not. This information is then used to filter out non-distinct rows from the input page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I wasn't sure if getPositions
on Block actually always creates a copy or a view of the block. Looking at the implementation in say RunLengthEncodedBlock
it makes no sense to me because it discards the positions and just returns another wrapper. DictionaryBlock
also seems to play similar games.
The default implementation creates a DictionaryBlock
which I think is just a wrapper around the source block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the key difference is that the filtered stuff is normally fed to the underlying aggregation. So the reduction happens there and most of this is concerned with identifying what to feed to it. But here I think we just store the page away because we won't feed it to the aggregation until much later.
For distinct I guess we have to store all the values so we don't need to provide it to the underlying aggregation, but we do want to discard the non-distinct values from memory.
Depending on how MarkDistinctHash works is it going to store these values twice?
If there are multiple distinct aggregations on the same column with no filter or the same filter then we would also like to share the memory used to track distinct values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on how MarkDistinctHash works is it going to store these values twice?
MarkDistinctHash will store each distinct row only once. It internally uses GroupByHash which has all the logic to detect duplicates. When duplicate row comes in, it computes the hash for this duplicate row and if this hash has been seen before, then it performs an actual check of values if they are same or not. If equality check passes, then it means that it is actually a duplicate row. If equality check does not passes, then it means that there was a hash collision and it is handled accordingly. So, basically, each distinct row is stored only once.
If there are multiple distinct aggregations on the same column with no filter or the same filter then we would also like to share the memory used to track distinct values.
Yes, that would be good to have. But, I am not targeting this in this PR. Currently, each aggregate has its own MarkDistinctHash that is used to perform deduplication. Maybe, this can be done as an optimization in followup PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably put out too much noise so the signal got lost here.
filter
doesn't actually filter. It returns a Page
that appears to be a view of the original Page
's blocks. This is retaining all non-distinct values for the distincted columns. Please correct me if I am wrong, but all the getPositions
implementations I looked at create views of the original Block
.
MarkDistinctHash
copies the column values being distincted so we are storing the values twice. Once in MarkDistinctHash
and once in the pages that we fed to mark distinct hash, but also retained. We don't need to do that. We could only retain MarkDistinctHash
and then extract the values stored in it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filter doesn't actually filter. It returns a Page that appears to be a view of the original Page's blocks. This is retaining all non-distinct values for the distincted columns. Please correct me if I am wrong, but all the getPositions implementations I looked at create views of the original Block.
This is correct. filter
calls getPositions
which creates a view over the existing block. But, when we prepare pages for spill, we only spill filtered values and during unspill, create a page with only distinct rows. We can actually call copyPositions
instead which performs the actual copy and returns a new page which only contains distinct value. This will incur some cpu cost due to copy. How expensive is copy operation? If we go with copy approach, we will save on memory buffer as only distinct values will be kept and buffered.
MarkDistinctHash copies the column values being distincted so we are storing the values twice. Once in MarkDistinctHash and once in the pages that we fed to mark distinct hash, but also retained. We don't need to do that. We could only retain MarkDistinctHash and then extract the values stored in it.
This is actually a good observation. We can actually get values directly from MarkDistinctHash but MarkDistinctHash only stores the value for hash channels. It does not store values for maskChannel which is also required in DistinctingGroupedAccumulator in some cases. Its possible to modify markDistinctHash to start storing maskChannel values as well. Let me give it a deeper thought tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the issue here is that turning on spilling with this implementation causes queries to use more memory thus forcing them to spill and possibly run out of spill space.
To roll out spilling for distinct aggregation for all queries we need to be at least as memory efficient as the non-spilling version. There is no justification for a regression on the wider workload it really will cause problems.
To target specific queries where it helps and doesn't hurt then sure we can use this approach, but it is pretty wrong to store non-distinct values in memory. That isn't the right behavior for aggregation.
Regarding the performance impact of copying. I think that is a non-issue. If there are few distinct values retained then the performance advantage from copying and not spilling will be insurmountable. If there are many unique values we first of all don't have to do the copy (it can be conditional), but we will run out of memory so quickly that it won't be the dominant performance factor as spilling will dominate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I have to say on some level it is impressive how little code this ended up being.
Optional.empty(), | ||
joinCompiler, | ||
() -> { | ||
updateMemory.update(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preserve the comments here? Or maybe don't copy paste it and make it a shared lamba in all 3 instances in the file?
groupIdsBlock = new GroupByIdBlock(groupIdsBlock.getGroupCount(), dedupPage.getBlock(0)); | ||
|
||
// Remove the groupId block from distinct Page | ||
page = dedupPage.extractChannels(IntStream.range(1, dedupPage.getChannelCount()).toArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now my question is which step here actually removes the non-distinct rows from memory?
@@ -689,6 +716,27 @@ public void addInput(GroupByIdBlock groupIdsBlock, Page page) | |||
checkState(rawInputs != null && blockBuilders == null); | |||
rawInputs.ensureCapacity(rawInputsLength); | |||
|
|||
// Perform dedup for distinct aggregates | |||
if (delegate instanceof DistinctingGroupedAccumulator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't read the logic yet. A quick design nit:
- instead of doing type check, we can introduce a method for
FinalOnlyGroupedAccumulator
interface calledPage addInputPreprocessing(GroupByIdBlock groupIdsBlock, Page page)
with different implementations forDistinctingGroupedAccumulator
andOrderingGroupedAccumulator
(which is a no op) markDistinctHash
is better not visible atSpillableFinalOnlyGroupedAccumulator
but insideDistinctingGroupedAccumulator
.- Would be good to clarify the markDistinctHash already in
DistinctingGroupedAccumulator
and the new one you just introduced. Haven't read the code yet. Are they the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re 1: Sure. I was initially thinking in that direction since I expected too many code changes but since the changes were not much, I went with ahead with in-lining the code within the IF check. Let me do what you are suggesting.
Re 2: SpillableFinalOnlyGroupedAccumulator
buffers the input and spills them to disk, if needed, until all inputs are received by the HashAggregationOperator. We need to perform deduplication before pages are buffered/spilled so that we can save on memory/spilled bytes. We cannot reuse the markDistinctHash
from delegate directly for performing this dedup since there is no way to reset it back. Hence, we are creating a separate markDistinctHash
object for SpillableFinalOnlyGroupedAccumulator
for performing partial dedup.
If we want to use the hash from delegate, then we would have store all the info that is needed to create MarkDistinctHash object in DistinctingGroupedAccumulator and add a reset()
method in FinalOnlyGroupedAccumulator
which would basically reset the hash object just before we start pushing pages to delegate.
Re 3: They are NOT the same object. They are created with same args and does the same thing but markDistinctHash
inside SpillableFinalOnlyGroupedAccumulator
perform partial dedup (between spill cycles) while markDistinctHash
inside delegate performs the final dedup.
TestSpilledAggregations has multiple test cases that executes the spill path for distinct aggregates. The changes are in common code path and it will be executed for all distinct aggregate test cases by default. I am not sure how we can add a unit test that can check for partial states inside HashAggregationOperator. Any pointers will be very helpful. Same with memory footprint. How can I add a unit test for doing memory checks? |
c4ed525
to
99dee7c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The structure looks great! Will take a look on logic details tomorrow.
abstract GroupIdPage addInputPreprocessing(GroupIdPage groupIdPage); | ||
|
||
abstract void reset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add javadocs to these two methods?
...o-main/src/main/java/com/facebook/presto/operator/aggregation/GenericAccumulatorFactory.java
Outdated
Show resolved
Hide resolved
@@ -757,6 +819,10 @@ public void evaluateIntermediate(int groupId, BlockBuilder output) | |||
if (blockBuilders == null) { | |||
checkState(rawInputs != null); | |||
|
|||
// Release memory held by markDistinctHash in delegate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's avoid using the implementation detail comments like "markDistinctHash" in the interface invocation
99dee7c
to
f15e890
Compare
@highker : I discovered a bug during verifier run and it seems that doing partial dedup before applying filter mask can return incorrect results in some cases. This is because dedup results depends on the order of input rows and markDistinctHash does not consider maskChannel while computing hash. For example, consider the following data
Query: SELECT custkey, count(DISTINCT orderpriority) FILTER(WHERE orderkey > 100) FROM orders GROUP BY custkey In addInputPreprocessing, we first perform dedup using markDistinctHash which computes hashes on groupId + input channels. Since, custKey is group by key and orderpriority is input channel, hash will be computed on these two columns. When we will perform dedup in addInputPreprocessing(), the second row from the input will be removed and first and third row will be pushed to delegate. Delegate (DistinctingGroupedAccumulator) will apply filter based on maskChannel (orderKey) and thus will filter out both the rows, thereby producing incorrect result. There are two ways to fix this:
Option 2 is better since it eliminate rows that will be eventually filtered out inside delegate in preprocessing itself, thereby reducing memory usage and spilled bytes. |
@pgupta2, thanks for the fix. I haven't got a chance to look into logic details yet. Will pay attention to it while reviewing. On the other side, shall we introduce a config + session property to guard this new behavior? It would be useful to prevent production correctness issues. |
presto-common/src/main/java/com/facebook/presto/common/block/ColumnarRow.java
Show resolved
Hide resolved
GroupByIdBlock distinctGroupIdsBlock = new GroupByIdBlock(groupIdsBlock.getGroupCount(), dedupPage.getBlock(0)); | ||
|
||
// Remove the groupId block from distinct Page | ||
Page distinctPage = dedupPage.extractChannels(IntStream.range(1, dedupPage.getChannelCount()).toArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating IntStream.range(1, dedupPage.getChannelCount()).toArray()
for each input page seems unnecessarily expensive, I suggesting storing that array as a field if the size is knowable in advance, or to add a method to Page
like Page#dropColumn(int)
to avoid the per call overhead.
0967c72
to
d559865
Compare
@@ -147,6 +148,17 @@ public int getPositionCount() | |||
return nullCheckBlock.getPositionCount(); | |||
} | |||
|
|||
public int getNotNullPositionCount() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/NotNull/NonNull
@@ -16,6 +16,7 @@ | |||
import org.openjdk.jol.info.ClassLayout; | |||
|
|||
import static io.airlift.slice.SizeOf.sizeOf; | |||
import static java.util.Objects.isNull; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used
@@ -181,7 +182,7 @@ public GroupedAccumulator createGroupedAccumulator(UpdateMemory updateMemory) | |||
ImmutableList.Builder<Integer> aggregateInputChannels = ImmutableList.builder(); | |||
aggregateInputChannels.addAll(inputChannels); | |||
maskChannel.ifPresent(aggregateInputChannels::add); | |||
return new SpillableFinalOnlyGroupedAccumulator(sourceTypes, aggregateInputChannels.build(), (FinalOnlyGroupedAccumulator) accumulator); | |||
return new SpillableFinalOnlyGroupedAccumulator(sourceTypes, aggregateInputChannels.build(), (FinalOnlyGroupedAccumulator) accumulator, isSpillableAccumulatorPreprocessingEnabled(session)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's checkArg session is not null. It's interesting that session could be null when distinct
is false by reading the context.
@@ -129,6 +129,7 @@ | |||
private boolean orderByAggregationSpillEnabled = true; | |||
private boolean windowSpillEnabled = true; | |||
private boolean orderBySpillEnabled = true; | |||
private boolean spillableAccumulatorPreprocessingEnabled = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we make it to false? It is safer to rollout in prod
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to keep this enabled in code so that it can go though out strict release verification process. That way, we would be able to detect any regression/issues during release verification itself. If we disable it in code and the enable it via config in prod, then we will miss the exhaustive testing coverage that we would have gotten otherwise.
I am also going to perform exhaustive verifier runs to test this PR but still any additional testing is cherry on top. Let me know what you think and if you have strong opinion for disabling it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol, don't trust our release process. We don't validate spilling during our release ....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha.. Ok. I will disable it then..
int newPositionCount = columnarRow.getPositionCount(); // number of positions in expanded array (since columnarRow is already flattened) | ||
int newPositionCount = columnarRow.getNotNullPositionCount(); // number of positions in expanded array (since columnarRow is already flattened) | ||
long[] newGroupIds = new long[newPositionCount]; | ||
boolean[] nulls = new boolean[newPositionCount]; | ||
int currentRowBlockIndex = 0; | ||
for (int groupIdPosition = 0; groupIdPosition < groupIdsBlock.getPositionCount(); groupIdPosition++) { | ||
for (int unused = 0; unused < arrayBlock.getBlock(groupIdPosition).getPositionCount(); unused++) { | ||
// unused because we are expanding all the squashed values for the same group id | ||
if (arrayBlock.getBlock(groupIdPosition).isNull(unused)) { | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite get this newly changed logic. Could we briefly explain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While pre-processing, we first perform a filter on maskChannel. This filter can cause some groupIds to get filtered out completely i.e.. we might end up no rows for the given groupId. In evaluateIntermediate(), we get a groupId as an input and we basically store all the rows for the given groupId in its corresponding output
block. Since, all rows for the given groupId has been filtered, we basically store a null in the output
BlockBuilder. This is being done here:
if (blockBuilders.get(groupId) == null) {
singleArrayBlockWriter.appendNull();
}
Since, we are storing null for certain groupIds, inside addIntermediate, we can expect output block for a groupId to be null. This logic checks for that case and skip the groupIds for which there are no rows.
Block distinctMask = work.getResult(); | ||
|
||
// Filter out duplicate rows and return the page with distinct rows | ||
Page dedupPage = filter(filtered, distinctMask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likely we could do a compact
when filtering ratio is high.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think even if it compacted every time it would still be a 2x memory regression because it is storing the distinct values twice compared to the non-spill case where the values are stored once in MarkDistinctHash
and 0-1x inside the wrapped accumulator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am currently working on adding support for using MarkDistinctHash as the input source when pre-processing is enabled. I have the changes ready and unit tests are passing but I want to do some verifier runs to ensure that changes are working correctly. With these changes, we will no longer buffer pages in rawInputs
when pre-processing is enabled. Instead, we will get them from markDistinctHash
right before spilling to disk or pushing to underlying delegate.
With these changes, I am going to restructure the code in the following way:
SpillableFinalOnlyGroupedAccumulator
will be used when pre-processing is disabled or when delegate is OrderingGroupedAccumulator. Some changes are done in this class to support pre-processed data but these changes are backward compatible with non-processed inputs as well.- We will add a new class called
SpillableDistinctPreprocessingFinalOnlyGroupedAccumulator
which extendsSpillableFinalOnlyGroupedAccumulator
and is used only for distinct aggregates when pre-processing is enabled. This will allow us to write distinct specific logic without adding too many methods inFinalOnlyGroupedAccumulator
interface. - We will remove the
addInputPreprocessing()
andreset()
API fromFinalOnlyGroupedAccumulator
since these applies only to distinct aggregates and could be called directly insideSpillableDistinctPreprocessingFinalOnlyGroupedAccumulator
SpillableDistinctPreprocessingFinalOnlyGroupedAccumulator
will override necessary methods like addInput, evaluateIntermediate, prepareFinal and will perform logic that is specific to distinct aggregates with preprocessing enabled.
With these changes, we will be able to cleanly separate out logic when pre-processing is enabled vs disabled. I will update the PR once I have done some verifier testing but would be eager to get feedback on the new design.
@@ -488,6 +504,46 @@ public void evaluateFinal(int groupId, BlockBuilder output) | |||
public void prepareFinal() | |||
{ | |||
} | |||
|
|||
@Override | |||
public GroupIdPage addInputPreprocessing(GroupIdPage groupIdPage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method has a lot of duplication with addInput
. Can we improve on that?
c892fe3
to
d0c93a5
Compare
@highker @aweisberg @pettyjamesm : Updated the PR to use markDistinctHash as input source for Dedup pages. This will allows us to skip buffering in
|
d0c93a5
to
8354a8a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given it's a big structure change from the previous design, can we squash two commits into one?
return wrapBlocksWithoutCopy(positionCount, result); | ||
} | ||
|
||
public Page insertColumnAtIndex(int channelIndex, Block column) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just call this insertColumn
. Also, it would be good to refactor prependColumn
to leverage this new interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found a bug while testing and after fixing it, this method is no longer needed. This will be removed in the next update.
@Override | ||
public List<Page> getBufferedPages() | ||
{ | ||
throw new UnsupportedOperationException("BigIntGroupByHash does not support getBufferedPages"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BigintGroupByHash is widely used. Do we wanna implement this? Also, would be good to have a test case to cover this path. This can be triggered when the grouping key is a single long type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For DISTINCT aggregate, we always use MultiChannelGroupByHash
since hash is computed on the groupId and the distinct column channel. So, there will be minimum 2 channels present for DISTINCT aggregates.
Since, preprocessing logic (and using MarkDistinctHash as input source) is only done for Distinct Aggregates, I added the implementation only for MultiChannelGroupByHash
.
For BigintGroupByHash
, adding this implementation does not make sense since there is no logic that calls into it. This is the reason I explicitly throw the exception so that if anyone tries to use it in future, they will have to add an implementation for it first. I also feel that there is no use-case to use BigintGroupByHash
as page input source currently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Maybe add a comment to this.
@Override | ||
public List<Page> getBufferedPages() | ||
{ | ||
ImmutableList.Builder<Page> inputPageList = new ImmutableList.Builder<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- s/inputPageList/inputPages
- s/new ImmutableList.Builder<>()/ImmutableList.builder();
@Override | ||
public List<Page> getBufferedPages() | ||
{ | ||
throw new UnsupportedOperationException("BigIntGroupByHash does not support getBufferedPages"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return ImmutableList.of()
. This is also a legit case; so we wanna return something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same reasoning as above. There is no code that calls this method for this class and that why I have not implemented it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same; add a comment; also there is a typo "BigIntGroupByHash"
|
||
import static java.util.Objects.requireNonNull; | ||
|
||
public class GroupIdPage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given we are not going to use this outside SpillableFinalOnlyGroupedAccumulator
, shall we keep as it was?
@@ -181,7 +184,14 @@ public GroupedAccumulator createGroupedAccumulator(UpdateMemory updateMemory) | |||
ImmutableList.Builder<Integer> aggregateInputChannels = ImmutableList.builder(); | |||
aggregateInputChannels.addAll(inputChannels); | |||
maskChannel.ifPresent(aggregateInputChannels::add); | |||
return new SpillableFinalOnlyGroupedAccumulator(sourceTypes, aggregateInputChannels.build(), (FinalOnlyGroupedAccumulator) accumulator); | |||
|
|||
checkArgument(session != null, "Session is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use checkState
. checkArgument
is used for the input parameter check; while checkState
is to assert there is nothing wrong with the current state of the class.
return new SpillableFinalOnlyGroupedAccumulator(sourceTypes, aggregateInputChannels.build(), (FinalOnlyGroupedAccumulator) accumulator); | ||
|
||
checkArgument(session != null, "Session is null"); | ||
if (isSpillableAccumulatorPreprocessingEnabled(session) && hasDistinct() && !hasOrderBy()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmmm..... what if you have order by and distinct? Isn't that going to be a corner case we can't handle well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have order by and distinct, we will have SpillableFinalOnlyGroupedAccumulator
whose child/delegate will be OrderingGroupedAccumulator
and its child will be DistinctingGroupedAccumulator
. Since, SpillableFinalOnlyGroupedAccumulator
is feeding data to OrderingGroupedAccumulator
, we cannot perform any pre-processing since all the data needs to flow to OrderingGroupedAccumulator
for it to work correctly.
Thats why we use SpillableFinalOnlyGroupedAccumulator
in this case.
if (isSpillableAccumulatorPreprocessingEnabled(session) && hasDistinct() && !hasOrderBy()) { | ||
return new SpillableDistinctPreprocessingFinalOnlyGroupedAccumulator(sourceTypes, aggregateInputChannels.build(), (DistinctingGroupedAccumulator) accumulator, maskChannel); | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move else, it's redundant; just directly return
inputPages = inputPages.stream().map( | ||
page -> page.insertColumnAtIndex(maskChannel, RunLengthEncodedBlock.create(BOOLEAN, true, page.getPositionCount())) | ||
).collect(toImmutableList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
inputPages = inputPages.stream()
.map(page -> page.insertColumnAtIndex(maskChannel, RunLengthEncodedBlock.create(BOOLEAN, true, page.getPositionCount())))
.collect(toImmutableList());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be removed in the new update of this diff
8354a8a
to
c19a46a
Compare
I am planning to squash the two commits together once the PR is approved right before merging. Review wise, multiple commits should not cause any difference. Right? Is there any advantage of merging these commits before? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we squash all commits into one?
@Override | ||
public List<Page> getBufferedPages() | ||
{ | ||
throw new UnsupportedOperationException("BigIntGroupByHash does not support getBufferedPages"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Maybe add a comment to this.
@Override | ||
public List<Page> getBufferedPages() | ||
{ | ||
throw new UnsupportedOperationException("BigIntGroupByHash does not support getBufferedPages"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same; add a comment; also there is a typo "BigIntGroupByHash"
@@ -128,6 +128,7 @@ | |||
public static final String ORDER_BY_AGGREGATION_SPILL_ENABLED = "order_by_aggregation_spill_enabled"; | |||
public static final String WINDOW_SPILL_ENABLED = "window_spill_enabled"; | |||
public static final String ORDER_BY_SPILL_ENABLED = "order_by_spill_enabled"; | |||
public static final String SPILLABLE_ACCUMULATOR_PREPROCESSING_ENABLED = "spillable_accumulator_preprocessing_enabled"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will no longer be a preprocessing. It could be something like "dedup_based_distinct_aggregation_spill_enabled".
/** | ||
* SpillableAccumulator to perform preprocessing of input data for distinct aggregates only | ||
*/ | ||
private static class SpillableDistinctPreprocessingFinalOnlyGroupedAccumulator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: DedupBasedSpillableDistinctGroupedAccumulator
c19a46a
to
99445c7
Compare
Test failure is related |
SpillableFinalOnlyGroupedAccumulator buffers input pages in `rawInputs` and when spill is triggered, these pages are spilled to disk. Currently, entire page is buffered and spilled. For distinct aggregates, we can dedup rows within the same groupId before buffering. This reduces memory used for buffering and also amount of data to be spilled, if spill is triggered.
99445c7
to
eb4b28e
Compare
I noticed the failure but didn't updated the PR since there was a bug that I found and I wanted to include the fix as well in the update. The bug triggers when aggregation maskFilter filters out all the input rows. In that case, nothing is sent to delegate's accumulator and it throws a NLP (since its internal states are not initialized). The fix is to send an empty page in such case with the correct groupCount so that delegate's accumulator can init its internal states. The bug is now fixed and I have added a unit test to test this. |
LGTM. @aweisberg, do you have other comments on this PR? |
SpillableFinalOnlyGroupedAccumulator buffers input pages in
rawInputs
and whenspill is triggered, these pages are spilled to disk. Currently, entire page is
buffered and spilled. For distinct aggregates, we can dedup rows within the same
groupId before buffering. This reduces memory used for buffering and also amount of
data to be spilled, if spill is triggered.
Test plan - Unit Tests, Verifier runs.