-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31973][SQL] Skip partial aggregates if grouping keys have high cardinality #28804
Conversation
@@ -165,6 +166,26 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession | |||
} | |||
} | |||
|
|||
test("SPARK-: Avoid spill in partial aggregation " + | |||
"when spark.sql.aggregate.spill.partialaggregate.disabled is set") { | |||
withSQLConf((SQLConf.SPILL_PARTIAL_AGGREGATE_DISABLED.key, "true"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not sufficient since the rows still get added to UnsafeFixedWidthAggregationMap
, need to figure out a way to avoid adding elements to UnsafeFixedWidthAggregationMap
. Please advise @cloud-fan @gatorsmile @maropu
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you show us performance numbers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu I figured out few more improvements taht can be made to the generated code, I will test them and also addd the benchmark number.
Adding WIP tag to the title.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu
I have added the benchmark to the description.
I have also figured out a way to add UT for this.
@maropu @cloud-fan @gatorsmile Can you please help review the change
ok to test |
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
Outdated
Show resolved
Hide resolved
@@ -2173,6 +2173,13 @@ object SQLConf { | |||
.checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in [10, 30].") | |||
.createWithDefault(16) | |||
|
|||
val SPILL_PARTIAL_AGGREGATE_DISABLED = | |||
buildConf("spark.sql.aggregate.spill.partialaggregate.disabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
disabled
-> enabled
to follow the other config naming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have renamed the config. Can you please check
Test build #124189 has finished for PR 28804 at commit
|
@maropu @cloud-fan Can you please add me to the whitelist to trigger the tests? |
Test build #124199 has finished for PR 28804 at commit
|
add to whitelist |
Test build #124209 has finished for PR 28804 at commit
|
Test build #124294 has finished for PR 28804 at commit
|
Test build #124295 has finished for PR 28804 at commit
|
Test build #124296 has finished for PR 28804 at commit
|
This reverts commit 086ba42.
Test build #127214 has finished for PR 28804 at commit
|
Test build #127216 has finished for PR 28804 at commit
|
Test build #127218 has finished for PR 28804 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
Outdated
Show resolved
Hide resolved
|// output the result | ||
|$outputFromFastHashMap | ||
|$outputFromRegularHashMap | ||
""".stripMargin | ||
} | ||
|
||
override def needStopCheck: Boolean = skipPartialAggregateEnabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If skipPartialAggregateEnabled
= true but #rows/cardinality don't go over the threshold, partial aggregates are not skipped. Even in that case, we set true to needStopCheck
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the ration of cardinality to numRows,
doesnot go beyond threshold - it implies that the optimization has not kicked in yet. In which case org.apache.spark.sql.execution.CodegenSupport#shouldStopCheckCode
, returns false. And continues with the iterating over remaining items of the iterator.
goes beyond threshold - We add the item(since the addition to Map is skipped) to the org.apache.spark.sql.execution.BufferedRowIterator#currentRows, which gets consumed by the parent.
Since it is inexpensive operation and has been used at many places in HashAggregateExec and didnt see any performance penalties, this approached seemed ok to me .
Please let me know if you have any other suggestions.
Generated code:
private void agg_doAggregateWithKeys_0() throws java.io.IOException {
/* 318 */ while ( localtablescan_input_0.hasNext()) {
/* 319 */ InternalRow localtablescan_row_0 = (InternalRow) localtablescan_input_0.next();
/* 320 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[7] /* numOutputRows */).add(1);
/* 321 */ boolean localtablescan_isNull_0 = localtablescan_row_0.isNullAt(0);
/* 322 */ UTF8String localtablescan_value_0 = localtablescan_isNull_0 ?
/* 323 */ null : (localtablescan_row_0.getUTF8String(0));
/* 324 */ int localtablescan_value_1 = localtablescan_row_0.getInt(1);
/* 325 */
/* 326 */ agg_doConsume_0(localtablescan_value_0, localtablescan_isNull_0, localtablescan_value_1);
/* 327 */ if (shouldStop()) return; // code added as part of needStopCheck
/* 328 */ }
/* 329 */
/* 330 */ agg_childrenConsumed_0 = true;
/* 331 */
/* 332 */ agg_fastHashMapIter_0 = agg_fastHashMap_0.rowIterator();
/* 333 */ agg_mapIter_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).finishAggregate(agg_hashMap_0, agg_sorter_0, ((org.apache.spark.sql.execution.metric.SQLMetric) references[3] /* peakMemory */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /* spillSize */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[5] /* avgHashProbe */));
/* 334 */
/* 335 */ }
Can we have a short description of the approach in the PR description? Seems like we are making hash aggregate adaptive to do pass-through if it can't reduce data size in the first n rows. |
Test build #127340 has finished for PR 28804 at commit
|
btw, could this optimization be implemented on the adaptive execution framework ( Have you already considered this approach? What I'm worried about now is that the current implementation makes the code complicated and it is limited to hash aggregates w/codegen only. |
I don't think AQE can help here. This is partial aggregate and usually there won't be a shuffle right before the partial agg. |
Hm, I see. Even so, |
@maropu The stats(specifically number of records from aggregation map after a threshold) that we are looking for is available only at the operator level at runtime. |
I pointed out not the current approach, but the previous one: https://github.com/apache/spark/pull/28804/files#r446720097 |
AQE doesn't provide column stats, and column stats propagation can be incorrect if we have many operators. IIUC the current approach is: sample the first 100000 rows, if they can't reduce data by half (which means one key has 2 values by average), then we skip the partial aggregate. This sounds reasonable, but it's hard to tell how to pick the config values. @karuppayya do you have any experience of using it in practice? |
@cloud-fan |
Test build #129988 has finished for PR 28804 at commit
|
Test build #132721 has finished for PR 28804 at commit
|
Any update for the PR? Thanks. |
@karuppayya - we did a similar change internally and rolled out to production already in facebook. We made some change on top of this (e.g. only skip partial aggregate when the map needs spill), and fixed several bugs. Do you mind if we submit a separate PR (list you as co-author) and help move the feature forward? Thanks. |
@c21 Please go ahead |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
In case of HashAggregation, a partial aggregation(update) is done followed by final aggregation(merge)
During partial aggregation we sort and spill to disk every-time fby, when the fast Map(when enabled) and UnsafeFixedWidthAggregationMap gets exhausted
When the cardinality of grouping column is close to the total number of records being processed, the sorting of data spilling to disk is not required, since it is kind of no-op and we can directly use rows in Final aggregation.
When the user is aware of nature of data, currently he has no control over disabling this sort, spill operation.
This is similar to following issues in Hive:
https://issues.apache.org/jira/browse/HIVE-223
https://issues.apache.org/jira/browse/HIVE-291
In this PR, the ability to disable sort/spill during partial aggregation is added
Benchmark
spark.executor.memory = 12G
Init code
Generate data
query
Benchmark code
Results
Percent improvement:
90000000 → 30.46%, 60000000 → 26.70%
Why are the changes needed?
This improvement can improve the performance of queries
Does this PR introduce any user-facing change?
No
How was this patch tested?
This patch was tested manually